Skip to main content

quipu_core/
query.rs

1use crate::error::{Error, Result};
2use crate::id::Uid;
3use crate::model::{Content, StoredValue, Value};
4use crate::storage::Position;
5use base64::engine::general_purpose::URL_SAFE_NO_PAD;
6use base64::Engine;
7use serde::{Deserialize, Serialize};
8use std::collections::BTreeMap;
9
10/// How a [`TargetFilter`] compares the probe value against stored values.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum MatchMode {
14    #[default]
15    Exact,
16    /// Case-insensitive exact match. Plain fields are scanned; protected
17    /// fields need [`crate::schema::FieldIndex::Exact`] declared (the
18    /// lowercased token digest is looked up — no false positives).
19    ExactCi,
20    /// Case-insensitive prefix match. Plain fields are scanned; protected
21    /// fields need [`crate::schema::FieldIndex::Prefix`] covering the probe
22    /// length (prefix tokens are exact — no false positives).
23    Prefix,
24    /// LIKE-style substring match. Works on plain fields (in-memory scan) and
25    /// on RSA fields (values are decrypted with the private key and cached
26    /// per immutable version, so each value is decrypted at most once per
27    /// process). One-way hashed fields (Sha256/Hmac) cannot be
28    /// substring-scanned — the plaintext is never stored — unless they
29    /// declare [`crate::schema::FieldIndex::Ngram`], which matches candidate
30    /// digests instead (case-insensitive, may include false positives). An
31    /// Ngram index also narrows RSA fields to candidates before decryption.
32    Contains,
33}
34
35/// Search condition against a target entity's registry.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct TargetFilter {
38    pub entity_type: String,
39    pub field: String,
40    pub value: Value,
41    /// `true`: match if the entity carried this value in *any* version (so a
42    /// renamed entity is still found by its old name, and a search by the
43    /// current name also returns logs written before the rename).
44    /// `false`: only entities whose latest, non-deleted version matches.
45    #[serde(default = "default_include_past")]
46    pub include_past: bool,
47    #[serde(default)]
48    pub mode: MatchMode,
49}
50
51fn default_include_past() -> bool {
52    true
53}
54
55impl TargetFilter {
56    /// Exact match, including past versions (the usual audit question).
57    pub fn exact(entity_type: impl Into<String>, field: impl Into<String>, value: Value) -> Self {
58        Self {
59            entity_type: entity_type.into(),
60            field: field.into(),
61            value,
62            include_past: true,
63            mode: MatchMode::Exact,
64        }
65    }
66
67    /// Switch to substring (LIKE) matching — see [`MatchMode::Contains`].
68    pub fn contains(mut self) -> Self {
69        self.mode = MatchMode::Contains;
70        self
71    }
72
73    /// Switch to case-insensitive exact matching — see [`MatchMode::ExactCi`].
74    pub fn exact_ci(mut self) -> Self {
75        self.mode = MatchMode::ExactCi;
76        self
77    }
78
79    /// Switch to prefix matching — see [`MatchMode::Prefix`].
80    pub fn prefix(mut self) -> Self {
81        self.mode = MatchMode::Prefix;
82        self
83    }
84
85    /// Only match entities whose latest, non-deleted version matches.
86    pub fn latest_only(mut self) -> Self {
87        self.include_past = false;
88        self
89    }
90}
91
92/// Result order of a log query, by record append position (which is also
93/// arrival order — within one store, positions grow monotonically).
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
95#[serde(rename_all = "snake_case")]
96pub enum Order {
97    /// Oldest first.
98    Asc,
99    /// Newest first — the audit-UI default ("what just happened?"), and the
100    /// order under which `limit` means "the latest N".
101    #[default]
102    Desc,
103}
104
105/// Declarative log query. All set conditions are AND-ed.
106#[derive(Debug, Clone, Default, Serialize, Deserialize)]
107#[serde(default)]
108pub struct LogQuery {
109    /// Inclusive UTC-micros range.
110    pub from_micros: Option<u64>,
111    pub to_micros: Option<u64>,
112    pub method: Option<String>,
113    pub url_prefix: Option<String>,
114    /// Filter by actor registry attributes (same semantics as target filters,
115    /// applied to the actor's registry).
116    pub actor: Option<TargetFilter>,
117    /// Target conditions, AND-ed: a log matches only if it touches an entity
118    /// matching *every* filter ("logs that touched X *and* Y").
119    pub targets: Vec<TargetFilter>,
120    /// Custom-column equality conditions.
121    pub custom: BTreeMap<String, Value>,
122    /// Page size. With [`Order::Desc`] (the default) this means "the latest
123    /// N matches"; the page's `next_cursor` continues into older records.
124    pub limit: Option<usize>,
125    /// Scan/result order. Defaults to newest-first.
126    pub order: Order,
127    /// Opaque continuation token from a previous page's
128    /// [`QueryPage::next_cursor`]. The rest of the query (filters, order)
129    /// must stay identical between pages — the cursor only encodes *where*
130    /// the previous page stopped, not *what* it matched.
131    pub cursor: Option<String>,
132}
133
134/// One page of query results (see [`crate::ReadSnapshot::query_page`]).
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct QueryPage {
137    pub logs: Vec<LogView>,
138    /// Present when more matches remain past this page; feed it back via
139    /// [`LogQuery::cursor`] to continue. `None` (absent on the wire) on the
140    /// final page.
141    #[serde(default, skip_serializing_if = "Option::is_none")]
142    pub next_cursor: Option<String>,
143    /// Log segment files this query actually opened — observability for
144    /// time-range pruning (a narrow window over a long history should open
145    /// far fewer segments than the table holds).
146    pub segments_scanned: u64,
147}
148
149/// Cursor wire format (before base64): version byte, order byte, then the
150/// little-endian (segment seq, record idx) position of the last record the
151/// previous page returned. Positions are physical and append-only, so a
152/// cursor stays valid across snapshots: records appended after it sort
153/// strictly after (asc) / before-it-was-issued pages are unaffected (desc),
154/// and retention only removes whole old segments, which scans skip.
155const CURSOR_V1: u8 = 1;
156const CURSOR_LEN: usize = 1 + 1 + 8 + 8;
157
158pub(crate) fn encode_cursor(order: Order, pos: Position) -> String {
159    let mut b = [0u8; CURSOR_LEN];
160    b[0] = CURSOR_V1;
161    b[1] = (order == Order::Desc) as u8;
162    b[2..10].copy_from_slice(&pos.seq.to_le_bytes());
163    b[10..18].copy_from_slice(&pos.idx.to_le_bytes());
164    URL_SAFE_NO_PAD.encode(b)
165}
166
167pub(crate) fn decode_cursor(cursor: &str, order: Order) -> Result<Position> {
168    let bytes = URL_SAFE_NO_PAD
169        .decode(cursor)
170        .map_err(|_| Error::InvalidCursor("not a query cursor".into()))?;
171    let b: [u8; CURSOR_LEN] = bytes
172        .try_into()
173        .map_err(|_| Error::InvalidCursor("not a query cursor".into()))?;
174    if b[0] != CURSOR_V1 {
175        return Err(Error::InvalidCursor(format!(
176            "unsupported cursor version {}",
177            b[0]
178        )));
179    }
180    let cursor_desc = b[1] != 0;
181    if cursor_desc != (order == Order::Desc) {
182        return Err(Error::InvalidCursor(
183            "cursor was issued under the opposite sort order".into(),
184        ));
185    }
186    Ok(Position {
187        seq: u64::from_le_bytes(b[2..10].try_into().unwrap()),
188        idx: u64::from_le_bytes(b[10..18].try_into().unwrap()),
189    })
190}
191
192/// Snapshot of one entity exactly as it was when the log was written.
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct TargetSnapshot {
195    pub entity_registry_uid: Uid,
196    pub entity_type: String,
197    pub entity_id: String,
198    pub version: u32,
199    /// As-recorded field values: a later rename does not alter what shows here.
200    pub fields: BTreeMap<String, StoredValue>,
201    pub deleted: bool,
202    /// The referenced registry version could not be resolved (e.g. the
203    /// registry lost data the log outlived). The log itself is still valid —
204    /// one unresolvable entity must not make whole query results unreadable —
205    /// so it renders with this placeholder instead of failing.
206    #[serde(default)]
207    pub missing: bool,
208}
209
210/// A fully resolved query hit: the raw log row plus actor/target snapshots.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct LogView {
213    #[serde(with = "crate::id::hex_serde")]
214    pub log_id: Uid,
215    pub timestamp_micros: u64,
216    /// RFC 3339, always UTC+0.
217    pub timestamp: String,
218    pub actor: TargetSnapshot,
219    pub method: String,
220    pub url: String,
221    pub content: Content,
222    pub targets: Vec<TargetSnapshot>,
223    pub custom: BTreeMap<String, Value>,
224}