Skip to main content

heddle_core/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Structured query over the operation log.
3
4use std::{collections::BTreeMap, path::Path};
5
6use chrono::TimeZone;
7use objects::{
8    error::Result,
9    object::{ChangeId, OperationId},
10};
11use oplog::{OpEntry, OpLog, OpLogBackend, OpRecord};
12use refs::refs::{IndexedOperation, OperationLogIndex, OperationLogQuery};
13use schemars::JsonSchema;
14use serde::Serialize;
15
16use crate::{
17    ExecutionContext, HeddleReport, MachineOutputKind, OutputDiscriminator, ReportContract,
18    schema_for_report,
19};
20
21/// Query filters for the operation log facade.
22#[derive(Debug, Clone, Default, PartialEq, Eq)]
23pub struct QueryRequest {
24    pub actor: String,
25    pub symbol: String,
26    pub signal_kind: String,
27    pub thread: String,
28    pub verbs: Vec<String>,
29    pub since_secs: i64,
30    pub until_secs: i64,
31    pub limit: u32,
32    pub include_checkpoints: bool,
33}
34
35#[derive(Debug, Clone, Serialize, JsonSchema, PartialEq, Eq)]
36pub struct QueryReport {
37    pub output_kind: &'static str,
38    pub hits: Vec<QueryHit>,
39}
40
41impl QueryReport {
42    pub const CONTRACT: ReportContract = ReportContract {
43        schema_name: "query",
44        machine_output_kind: MachineOutputKind::Json,
45        output_discriminator: Some(OutputDiscriminator {
46            field: "output_kind",
47            value: "query",
48        }),
49        schema: schema_for_report::<QueryReport>,
50    };
51}
52
53impl HeddleReport for QueryReport {
54    const CONTRACT: ReportContract = QueryReport::CONTRACT;
55}
56
57#[derive(Debug, Clone, Serialize, JsonSchema, PartialEq, Eq)]
58pub struct QueryHit {
59    pub seq: u64,
60    pub timestamp_secs: i64,
61    pub verb: String,
62    pub actor_email: String,
63    pub operation_id: Option<String>,
64    pub thread: Option<String>,
65    pub symbols: Vec<String>,
66    pub signal_kinds: Vec<String>,
67    pub change_id: Option<String>,
68}
69
70/// Query is an operator-facing inspection command, so it should answer from
71/// the live oplog even before the rebuildable index sidecar has been warmed.
72/// Keep the scan bounded; long-tail history can use the index once populated.
73const OPLOG_FALLBACK_SCAN_WINDOW: usize = 100_000;
74
75pub fn query(ctx: &ExecutionContext, req: QueryRequest) -> Result<QueryReport> {
76    let repo = ctx.require_repo()?;
77    let q = build_query(&req);
78    let hits = query_combined(repo.heddle_dir(), &q)?;
79    Ok(QueryReport {
80        output_kind: "query",
81        hits: hits.into_iter().map(hit_to_report).collect(),
82    })
83}
84
85fn build_query(req: &QueryRequest) -> OperationLogQuery {
86    let mut q = OperationLogQuery {
87        actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
88        symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
89        signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
90        thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
91        verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
92        since: parse_unix_secs(req.since_secs),
93        until: parse_unix_secs(req.until_secs),
94        limit: (req.limit > 0).then_some(req.limit as usize),
95    };
96    if !req.include_checkpoints && q.verbs.is_none() {
97        q.verbs = Some(
98            OpRecord::verbs(false)
99                .iter()
100                .map(|s| s.to_string())
101                .collect(),
102        );
103    }
104    q
105}
106
107fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
108    if secs == 0 {
109        return None;
110    }
111    chrono::Utc.timestamp_opt(secs, 0).single()
112}
113
114fn query_combined(heddle_dir: &Path, query: &OperationLogQuery) -> Result<Vec<IndexedOperation>> {
115    let index = OperationLogIndex::new(heddle_dir);
116    let mut unbounded = query.clone();
117    unbounded.limit = None;
118
119    let mut by_seq = BTreeMap::new();
120    for hit in index.query(&unbounded)? {
121        by_seq.insert(hit.seq, hit);
122    }
123
124    if unbounded.symbol.is_none() && unbounded.signal_kind.is_none() {
125        for hit in query_oplog_fallback(heddle_dir, &unbounded)? {
126            by_seq.entry(hit.seq).or_insert(hit);
127        }
128    }
129
130    let mut hits: Vec<_> = by_seq.into_values().collect();
131    hits.sort_by_key(|hit| hit.seq);
132    if let Some(limit) = query.limit {
133        hits.truncate(limit);
134    }
135    Ok(hits)
136}
137
138fn query_oplog_fallback(
139    heddle_dir: &Path,
140    query: &OperationLogQuery,
141) -> Result<Vec<IndexedOperation>> {
142    let log = OpLog::new_unattributed(heddle_dir);
143    let mut entries = log.recent(OPLOG_FALLBACK_SCAN_WINDOW)?;
144    entries.reverse();
145    let mut hits = Vec::new();
146    for entry in entries {
147        let hit = indexed_from_oplog_entry(&entry);
148        if indexed_operation_matches(&hit, query) {
149            hits.push(hit);
150        }
151    }
152    Ok(hits)
153}
154
155fn indexed_from_oplog_entry(entry: &OpEntry) -> IndexedOperation {
156    IndexedOperation {
157        seq: entry.id,
158        timestamp_secs: entry.timestamp.timestamp(),
159        verb: entry.operation.verb().to_string(),
160        actor_email: entry.actor.email.clone(),
161        operation_id: entry.operation_id,
162        thread: thread_for(&entry.operation),
163        symbols: Vec::new(),
164        signal_kinds: Vec::new(),
165        change_id: primary_change_id(&entry.operation),
166    }
167}
168
169fn indexed_operation_matches(hit: &IndexedOperation, query: &OperationLogQuery) -> bool {
170    if let Some(actor) = &query.actor
171        && &hit.actor_email != actor
172    {
173        return false;
174    }
175    if let Some(symbol) = &query.symbol
176        && !hit.symbols.iter().any(|candidate| candidate == symbol)
177    {
178        return false;
179    }
180    if let Some(kind) = &query.signal_kind
181        && !hit.signal_kinds.iter().any(|candidate| candidate == kind)
182    {
183        return false;
184    }
185    if let Some(thread) = &query.thread
186        && hit.thread.as_deref() != Some(thread.as_str())
187    {
188        return false;
189    }
190    if let Some(verbs) = &query.verbs
191        && !verbs.iter().any(|verb| verb == &hit.verb)
192    {
193        return false;
194    }
195    let timestamp = hit.timestamp();
196    if let Some(start) = query.since
197        && timestamp < start
198    {
199        return false;
200    }
201    if let Some(end) = query.until
202        && timestamp > end
203    {
204        return false;
205    }
206    true
207}
208
209fn hit_to_report(hit: IndexedOperation) -> QueryHit {
210    QueryHit {
211        seq: hit.seq,
212        timestamp_secs: hit.timestamp_secs,
213        verb: hit.verb,
214        actor_email: hit.actor_email,
215        operation_id: hit.operation_id.map(operation_id_to_string),
216        thread: hit.thread,
217        symbols: hit.symbols,
218        signal_kinds: hit.signal_kinds,
219        change_id: hit.change_id.map(|id| id.to_string_full()),
220    }
221}
222
223fn operation_id_to_string(id: OperationId) -> String {
224    id.to_string()
225}
226
227fn thread_for(op: &OpRecord) -> Option<String> {
228    match op {
229        OpRecord::Snapshot { thread, .. } => thread.clone(),
230        OpRecord::ThreadCreate { name, .. } => Some(name.clone()),
231        OpRecord::ThreadDelete { name, .. } => Some(name.clone()),
232        OpRecord::ThreadUpdate { name, .. } => Some(name.clone()),
233        OpRecord::MarkerCreate { name, .. } => Some(name.clone()),
234        OpRecord::MarkerDelete { name, .. } => Some(name.clone()),
235        OpRecord::Checkpoint { thread, .. } => thread.clone(),
236        OpRecord::EphemeralThreadCollapse { thread, .. } => Some(thread.clone()),
237        OpRecord::FastForward { target_thread, .. } => Some(target_thread.clone()),
238        OpRecord::GitCheckpoint { branch, .. } => Some(branch.clone()),
239        OpRecord::RemoteThreadUpdate { thread, .. }
240        | OpRecord::RemoteThreadDelete { thread, .. } => Some(thread.clone()),
241        OpRecord::Goto { .. }
242        | OpRecord::Fork { .. }
243        | OpRecord::Collapse { .. }
244        | OpRecord::TransactionAbort { .. }
245        | OpRecord::TransactionCommit { .. }
246        | OpRecord::ConflictResolved { .. }
247        | OpRecord::Redact { .. }
248        | OpRecord::UndoRecoveryUpdate { .. }
249        | OpRecord::StateVisibilitySet { .. }
250        | OpRecord::StateVisibilityPromote { .. }
251        | OpRecord::Purge { .. } => None,
252    }
253}
254
255fn primary_change_id(op: &OpRecord) -> Option<ChangeId> {
256    match op {
257        OpRecord::Snapshot { new_state, .. } => Some(*new_state),
258        OpRecord::Goto { target, .. } => Some(*target),
259        OpRecord::ThreadCreate { state, .. } => Some(*state),
260        OpRecord::ThreadDelete { state, .. } => Some(*state),
261        OpRecord::ThreadUpdate { new_state, .. } => Some(*new_state),
262        OpRecord::Fork { new_state, .. } => Some(*new_state),
263        OpRecord::Collapse { result, .. } => Some(*result),
264        OpRecord::MarkerCreate { state, .. } => Some(*state),
265        OpRecord::MarkerDelete { state, .. } => Some(*state),
266        OpRecord::Checkpoint { state, .. } => Some(*state),
267        OpRecord::GitCheckpoint { state, .. } => Some(*state),
268        OpRecord::EphemeralThreadCollapse { final_state, .. } => Some(*final_state),
269        OpRecord::Redact { state, .. } => Some(*state),
270        OpRecord::StateVisibilitySet { state, .. }
271        | OpRecord::StateVisibilityPromote { state, .. } => Some(*state),
272        OpRecord::RemoteThreadUpdate { state, .. } | OpRecord::RemoteThreadDelete { state, .. } => {
273            Some(*state)
274        }
275        OpRecord::UndoRecoveryUpdate { state } => Some(*state),
276        OpRecord::TransactionAbort { .. }
277        | OpRecord::TransactionCommit { .. }
278        | OpRecord::ConflictResolved { .. }
279        | OpRecord::Purge { .. }
280        | OpRecord::FastForward { .. } => None,
281    }
282}