Skip to main content

heddle_core/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Structured query over the operation log.
3
4use std::{collections::BTreeMap, path::Path};
5
6use chrono::TimeZone;
7use objects::{
8    error::Result,
9    object::{ChangeId, OperationId},
10};
11use oplog::{OpEntry, OpLog, OpLogBackend, OpRecord};
12use refs::refs::{IndexedOperation, OperationLogIndex, OperationLogQuery};
13use serde::Serialize;
14
15use crate::ExecutionContext;
16
17/// Query filters for the operation log facade.
18#[derive(Debug, Clone, Default, PartialEq, Eq)]
19pub struct QueryRequest {
20    pub actor: String,
21    pub symbol: String,
22    pub signal_kind: String,
23    pub thread: String,
24    pub verbs: Vec<String>,
25    pub since_secs: i64,
26    pub until_secs: i64,
27    pub limit: u32,
28    pub include_checkpoints: bool,
29}
30
31#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
32pub struct QueryReport {
33    pub output_kind: &'static str,
34    pub hits: Vec<QueryHit>,
35}
36
37#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
38pub struct QueryHit {
39    pub seq: u64,
40    pub timestamp_secs: i64,
41    pub verb: String,
42    pub actor_email: String,
43    pub operation_id: Option<String>,
44    pub thread: Option<String>,
45    pub symbols: Vec<String>,
46    pub signal_kinds: Vec<String>,
47    pub change_id: Option<String>,
48}
49
50/// Query is an operator-facing inspection command, so it should answer from
51/// the live oplog even before the rebuildable index sidecar has been warmed.
52/// Keep the scan bounded; long-tail history can use the index once populated.
53const OPLOG_FALLBACK_SCAN_WINDOW: usize = 100_000;
54
55pub fn query(ctx: &ExecutionContext, req: QueryRequest) -> Result<QueryReport> {
56    let repo = ctx.require_repo()?;
57    let q = build_query(&req);
58    let hits = query_combined(repo.heddle_dir(), &q)?;
59    Ok(QueryReport {
60        output_kind: "query",
61        hits: hits.into_iter().map(hit_to_report).collect(),
62    })
63}
64
65fn build_query(req: &QueryRequest) -> OperationLogQuery {
66    let mut q = OperationLogQuery {
67        actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
68        symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
69        signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
70        thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
71        verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
72        since: parse_unix_secs(req.since_secs),
73        until: parse_unix_secs(req.until_secs),
74        limit: (req.limit > 0).then_some(req.limit as usize),
75    };
76    if !req.include_checkpoints && q.verbs.is_none() {
77        q.verbs = Some(
78            OpRecord::verbs(false)
79                .iter()
80                .map(|s| s.to_string())
81                .collect(),
82        );
83    }
84    q
85}
86
87fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
88    if secs == 0 {
89        return None;
90    }
91    chrono::Utc.timestamp_opt(secs, 0).single()
92}
93
94fn query_combined(heddle_dir: &Path, query: &OperationLogQuery) -> Result<Vec<IndexedOperation>> {
95    let index = OperationLogIndex::new(heddle_dir);
96    let mut unbounded = query.clone();
97    unbounded.limit = None;
98
99    let mut by_seq = BTreeMap::new();
100    for hit in index.query(&unbounded)? {
101        by_seq.insert(hit.seq, hit);
102    }
103
104    if unbounded.symbol.is_none() && unbounded.signal_kind.is_none() {
105        for hit in query_oplog_fallback(heddle_dir, &unbounded)? {
106            by_seq.entry(hit.seq).or_insert(hit);
107        }
108    }
109
110    let mut hits: Vec<_> = by_seq.into_values().collect();
111    hits.sort_by_key(|hit| hit.seq);
112    if let Some(limit) = query.limit {
113        hits.truncate(limit);
114    }
115    Ok(hits)
116}
117
118fn query_oplog_fallback(
119    heddle_dir: &Path,
120    query: &OperationLogQuery,
121) -> Result<Vec<IndexedOperation>> {
122    let log = OpLog::new_unattributed(heddle_dir);
123    let mut entries = log.recent(OPLOG_FALLBACK_SCAN_WINDOW)?;
124    entries.reverse();
125    let mut hits = Vec::new();
126    for entry in entries {
127        let hit = indexed_from_oplog_entry(&entry);
128        if indexed_operation_matches(&hit, query) {
129            hits.push(hit);
130        }
131    }
132    Ok(hits)
133}
134
135fn indexed_from_oplog_entry(entry: &OpEntry) -> IndexedOperation {
136    IndexedOperation {
137        seq: entry.id,
138        timestamp_secs: entry.timestamp.timestamp(),
139        verb: entry.operation.verb().to_string(),
140        actor_email: entry.actor.email.clone(),
141        operation_id: entry.operation_id,
142        thread: thread_for(&entry.operation),
143        symbols: Vec::new(),
144        signal_kinds: Vec::new(),
145        change_id: primary_change_id(&entry.operation),
146    }
147}
148
149fn indexed_operation_matches(hit: &IndexedOperation, query: &OperationLogQuery) -> bool {
150    if let Some(actor) = &query.actor
151        && &hit.actor_email != actor
152    {
153        return false;
154    }
155    if let Some(symbol) = &query.symbol
156        && !hit.symbols.iter().any(|candidate| candidate == symbol)
157    {
158        return false;
159    }
160    if let Some(kind) = &query.signal_kind
161        && !hit.signal_kinds.iter().any(|candidate| candidate == kind)
162    {
163        return false;
164    }
165    if let Some(thread) = &query.thread
166        && hit.thread.as_deref() != Some(thread.as_str())
167    {
168        return false;
169    }
170    if let Some(verbs) = &query.verbs
171        && !verbs.iter().any(|verb| verb == &hit.verb)
172    {
173        return false;
174    }
175    let timestamp = hit.timestamp();
176    if let Some(start) = query.since
177        && timestamp < start
178    {
179        return false;
180    }
181    if let Some(end) = query.until
182        && timestamp > end
183    {
184        return false;
185    }
186    true
187}
188
189fn hit_to_report(hit: IndexedOperation) -> QueryHit {
190    QueryHit {
191        seq: hit.seq,
192        timestamp_secs: hit.timestamp_secs,
193        verb: hit.verb,
194        actor_email: hit.actor_email,
195        operation_id: hit.operation_id.map(operation_id_to_string),
196        thread: hit.thread,
197        symbols: hit.symbols,
198        signal_kinds: hit.signal_kinds,
199        change_id: hit.change_id.map(|id| id.to_string_full()),
200    }
201}
202
203fn operation_id_to_string(id: OperationId) -> String {
204    id.to_string()
205}
206
207fn thread_for(op: &OpRecord) -> Option<String> {
208    match op {
209        OpRecord::Snapshot { thread, .. } => thread.clone(),
210        OpRecord::ThreadCreate { name, .. } => Some(name.clone()),
211        OpRecord::ThreadDelete { name, .. } => Some(name.clone()),
212        OpRecord::ThreadUpdate { name, .. } => Some(name.clone()),
213        OpRecord::MarkerCreate { name, .. } => Some(name.clone()),
214        OpRecord::MarkerDelete { name, .. } => Some(name.clone()),
215        OpRecord::Checkpoint { thread, .. } => thread.clone(),
216        OpRecord::EphemeralThreadCollapse { thread, .. } => Some(thread.clone()),
217        OpRecord::FastForward { target_thread, .. } => Some(target_thread.clone()),
218        OpRecord::GitCheckpoint { branch, .. } => Some(branch.clone()),
219        OpRecord::RemoteThreadUpdate { thread, .. }
220        | OpRecord::RemoteThreadDelete { thread, .. } => Some(thread.clone()),
221        OpRecord::Goto { .. }
222        | OpRecord::Fork { .. }
223        | OpRecord::Collapse { .. }
224        | OpRecord::TransactionAbort { .. }
225        | OpRecord::TransactionCommit { .. }
226        | OpRecord::ConflictResolved { .. }
227        | OpRecord::Redact { .. }
228        | OpRecord::UndoRecoveryUpdate { .. }
229        | OpRecord::StateVisibilitySet { .. }
230        | OpRecord::StateVisibilityPromote { .. }
231        | OpRecord::Purge { .. } => None,
232    }
233}
234
235fn primary_change_id(op: &OpRecord) -> Option<ChangeId> {
236    match op {
237        OpRecord::Snapshot { new_state, .. } => Some(*new_state),
238        OpRecord::Goto { target, .. } => Some(*target),
239        OpRecord::ThreadCreate { state, .. } => Some(*state),
240        OpRecord::ThreadDelete { state, .. } => Some(*state),
241        OpRecord::ThreadUpdate { new_state, .. } => Some(*new_state),
242        OpRecord::Fork { new_state, .. } => Some(*new_state),
243        OpRecord::Collapse { result, .. } => Some(*result),
244        OpRecord::MarkerCreate { state, .. } => Some(*state),
245        OpRecord::MarkerDelete { state, .. } => Some(*state),
246        OpRecord::Checkpoint { state, .. } => Some(*state),
247        OpRecord::GitCheckpoint { state, .. } => Some(*state),
248        OpRecord::EphemeralThreadCollapse { final_state, .. } => Some(*final_state),
249        OpRecord::Redact { state, .. } => Some(*state),
250        OpRecord::StateVisibilitySet { state, .. }
251        | OpRecord::StateVisibilityPromote { state, .. } => Some(*state),
252        OpRecord::RemoteThreadUpdate { state, .. } | OpRecord::RemoteThreadDelete { state, .. } => {
253            Some(*state)
254        }
255        OpRecord::UndoRecoveryUpdate { state } => Some(*state),
256        OpRecord::TransactionAbort { .. }
257        | OpRecord::TransactionCommit { .. }
258        | OpRecord::ConflictResolved { .. }
259        | OpRecord::Purge { .. }
260        | OpRecord::FastForward { .. } => None,
261    }
262}