Skip to main content

daemon/grpc_local_impl/
operation_log_query.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local gRPC service for the W2 `OperationLogQueryService`.
3//!
4//! Read-only wrapper around [`refs::operation_index::OperationLogIndex`].
5//! Translates protobuf request/response shapes to/from the index's native
6//! [`OperationLogQuery`] / [`IndexedOperation`] types. No state changes,
7//! no idempotency wrapper.
8
9use std::{collections::BTreeMap, path::Path, pin::Pin};
10
11use chrono::TimeZone;
12use futures::Stream;
13use grpc::heddle::v1::{
14    OperationHit, QueryOperationsRequest, QueryOperationsResponse, StreamOperationsRequest,
15    operation_log_query_service_server::OperationLogQueryService,
16};
17use objects::{error::Result as HeddleResult, object::ChangeId};
18use oplog::{OpEntry, OpLog, OpLogBackend, OpRecord};
19use refs::operation_index::{IndexedOperation, OperationLogIndex, OperationLogQuery};
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::{Request, Response, Status};
22
23use super::{GrpcLocalService, to_status};
24
25#[derive(Clone)]
26pub struct LocalOperationLogQueryService {
27    inner: GrpcLocalService,
28}
29
30impl LocalOperationLogQueryService {
31    pub fn new(inner: GrpcLocalService) -> Self {
32        Self { inner }
33    }
34}
35
36/// Convert a unix-epoch-seconds field from the proto. `0` means "unset"
37/// because proto3 scalars don't distinguish presence; any other value is
38/// passed to [`chrono::Utc.timestamp_opt`] and discarded if out of range.
39fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
40    if secs == 0 {
41        return None;
42    }
43    chrono::Utc.timestamp_opt(secs, 0).single()
44}
45
46/// Query is an operator-facing inspection command, so it should answer from
47/// the live oplog even before the rebuildable index sidecar has been warmed.
48/// Keep the scan bounded; long-tail history can use the index once populated.
49const OPLOG_FALLBACK_SCAN_WINDOW: usize = 100_000;
50
51fn build_query(req: &QueryOperationsRequest) -> OperationLogQuery {
52    let mut q = OperationLogQuery {
53        actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
54        symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
55        signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
56        thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
57        verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
58        since: parse_unix_secs(req.since_secs),
59        until: parse_unix_secs(req.until_secs),
60        limit: (req.limit > 0).then_some(req.limit as usize),
61    };
62    if !req.include_checkpoints && q.verbs.is_none() {
63        // Derived from the oplog verb catalog (the single source of truth), not
64        // a hand-maintained list — so a new `OpRecord` variant is surfaced by
65        // default the moment it joins the catalog, instead of being silently
66        // dropped from the default query (heddle#354 r9, cid 3330304663).
67        q.verbs = Some(
68            OpRecord::verbs(false)
69                .iter()
70                .map(|s| s.to_string())
71                .collect(),
72        );
73    }
74    q
75}
76
77fn hit_to_proto(hit: IndexedOperation) -> OperationHit {
78    OperationHit {
79        seq: hit.seq,
80        timestamp_secs: hit.timestamp_secs,
81        verb: hit.verb,
82        actor_email: hit.actor_email,
83        operation_id: hit.operation_id.map(|o| o.to_string()).unwrap_or_default(),
84        thread: hit.thread.unwrap_or_default(),
85        symbols: hit.symbols,
86        signal_kinds: hit.signal_kinds,
87        change_id: hit
88            .change_id
89            .map(|c| c.as_bytes().to_vec())
90            .unwrap_or_default(),
91    }
92}
93
94fn query_combined(
95    heddle_dir: &Path,
96    query: &OperationLogQuery,
97) -> HeddleResult<Vec<IndexedOperation>> {
98    let index = OperationLogIndex::new(heddle_dir);
99    let mut unbounded = query.clone();
100    unbounded.limit = None;
101
102    let mut by_seq = BTreeMap::new();
103    for hit in index.query(&unbounded)? {
104        by_seq.insert(hit.seq, hit);
105    }
106
107    if unbounded.symbol.is_none() && unbounded.signal_kind.is_none() {
108        for hit in query_oplog_fallback(heddle_dir, &unbounded)? {
109            by_seq.entry(hit.seq).or_insert(hit);
110        }
111    }
112
113    let mut hits: Vec<_> = by_seq.into_values().collect();
114    hits.sort_by_key(|hit| hit.seq);
115    if let Some(limit) = query.limit {
116        hits.truncate(limit);
117    }
118    Ok(hits)
119}
120
121fn query_oplog_fallback(
122    heddle_dir: &Path,
123    query: &OperationLogQuery,
124) -> HeddleResult<Vec<IndexedOperation>> {
125    let log = OpLog::new_unattributed(heddle_dir);
126    let mut entries = log.recent(OPLOG_FALLBACK_SCAN_WINDOW)?;
127    entries.reverse();
128    let mut hits = Vec::new();
129    for entry in entries {
130        let hit = indexed_from_oplog_entry(&entry);
131        if indexed_operation_matches(&hit, query) {
132            hits.push(hit);
133        }
134    }
135    Ok(hits)
136}
137
138fn indexed_from_oplog_entry(entry: &OpEntry) -> IndexedOperation {
139    IndexedOperation {
140        seq: entry.id,
141        timestamp_secs: entry.timestamp.timestamp(),
142        verb: entry.operation.verb().to_string(),
143        actor_email: entry.actor.email.clone(),
144        operation_id: entry.operation_id,
145        thread: thread_for(&entry.operation),
146        symbols: Vec::new(),
147        signal_kinds: Vec::new(),
148        change_id: primary_change_id(&entry.operation),
149    }
150}
151
152fn indexed_operation_matches(hit: &IndexedOperation, query: &OperationLogQuery) -> bool {
153    if let Some(actor) = &query.actor
154        && &hit.actor_email != actor
155    {
156        return false;
157    }
158    if let Some(symbol) = &query.symbol
159        && !hit.symbols.iter().any(|candidate| candidate == symbol)
160    {
161        return false;
162    }
163    if let Some(kind) = &query.signal_kind
164        && !hit.signal_kinds.iter().any(|candidate| candidate == kind)
165    {
166        return false;
167    }
168    if let Some(thread) = &query.thread
169        && hit.thread.as_deref() != Some(thread.as_str())
170    {
171        return false;
172    }
173    if let Some(verbs) = &query.verbs
174        && !verbs.iter().any(|verb| verb == &hit.verb)
175    {
176        return false;
177    }
178    let timestamp = hit.timestamp();
179    if let Some(start) = query.since
180        && timestamp < start
181    {
182        return false;
183    }
184    if let Some(end) = query.until
185        && timestamp > end
186    {
187        return false;
188    }
189    true
190}
191
192fn thread_for(op: &OpRecord) -> Option<String> {
193    match op {
194        OpRecord::Snapshot { thread, .. } => thread.clone(),
195        OpRecord::ThreadCreate { name, .. } => Some(name.clone()),
196        OpRecord::ThreadDelete { name, .. } => Some(name.clone()),
197        OpRecord::ThreadUpdate { name, .. } => Some(name.clone()),
198        OpRecord::MarkerCreate { name, .. } => Some(name.clone()),
199        OpRecord::MarkerDelete { name, .. } => Some(name.clone()),
200        OpRecord::Checkpoint { thread, .. } => thread.clone(),
201        OpRecord::EphemeralThreadCollapse { thread, .. } => Some(thread.clone()),
202        OpRecord::FastForward { target_thread, .. } => Some(target_thread.clone()),
203        OpRecord::GitCheckpoint { branch, .. } => Some(branch.clone()),
204        OpRecord::RemoteThreadUpdate { thread, .. }
205        | OpRecord::RemoteThreadDelete { thread, .. } => Some(thread.clone()),
206        OpRecord::Goto { .. }
207        | OpRecord::Fork { .. }
208        | OpRecord::Collapse { .. }
209        | OpRecord::TransactionAbort { .. }
210        | OpRecord::TransactionCommit { .. }
211        | OpRecord::ConflictResolved { .. }
212        | OpRecord::Redact { .. }
213        | OpRecord::UndoRecoveryUpdate { .. }
214        | OpRecord::StateVisibilitySet { .. }
215        | OpRecord::StateVisibilityPromote { .. }
216        | OpRecord::Purge { .. } => None,
217    }
218}
219
220fn primary_change_id(op: &OpRecord) -> Option<ChangeId> {
221    match op {
222        OpRecord::Snapshot { new_state, .. } => Some(*new_state),
223        OpRecord::Goto { target, .. } => Some(*target),
224        OpRecord::ThreadCreate { state, .. } => Some(*state),
225        OpRecord::ThreadDelete { state, .. } => Some(*state),
226        OpRecord::ThreadUpdate { new_state, .. } => Some(*new_state),
227        OpRecord::Fork { new_state, .. } => Some(*new_state),
228        OpRecord::Collapse { result, .. } => Some(*result),
229        OpRecord::MarkerCreate { state, .. } => Some(*state),
230        OpRecord::MarkerDelete { state, .. } => Some(*state),
231        OpRecord::Checkpoint { state, .. } => Some(*state),
232        OpRecord::GitCheckpoint { state, .. } => Some(*state),
233        OpRecord::EphemeralThreadCollapse { final_state, .. } => Some(*final_state),
234        OpRecord::Redact { state, .. } => Some(*state),
235        OpRecord::StateVisibilitySet { state, .. }
236        | OpRecord::StateVisibilityPromote { state, .. } => Some(*state),
237        OpRecord::RemoteThreadUpdate { state, .. } | OpRecord::RemoteThreadDelete { state, .. } => {
238            Some(*state)
239        }
240        OpRecord::UndoRecoveryUpdate { state } => Some(*state),
241        OpRecord::TransactionAbort { .. }
242        | OpRecord::TransactionCommit { .. }
243        | OpRecord::ConflictResolved { .. }
244        | OpRecord::Purge { .. }
245        | OpRecord::FastForward { .. } => None,
246    }
247}
248
249#[tonic::async_trait]
250impl OperationLogQueryService for LocalOperationLogQueryService {
251    type StreamOperationsStream = Pin<Box<dyn Stream<Item = Result<OperationHit, Status>> + Send>>;
252
253    async fn query_operations(
254        &self,
255        request: Request<QueryOperationsRequest>,
256    ) -> Result<Response<QueryOperationsResponse>, Status> {
257        let req = request.into_inner();
258        let q = build_query(&req);
259        let hits = query_combined(self.inner.repo().heddle_dir(), &q).map_err(to_status)?;
260        let proto_hits = hits.into_iter().map(hit_to_proto).collect();
261        Ok(Response::new(QueryOperationsResponse { hits: proto_hits }))
262    }
263
264    async fn stream_operations(
265        &self,
266        request: Request<StreamOperationsRequest>,
267    ) -> Result<Response<Self::StreamOperationsStream>, Status> {
268        let req = request.into_inner();
269        let inner_req = req.query.unwrap_or_default();
270        let q = build_query(&inner_req);
271        let hits = query_combined(self.inner.repo().heddle_dir(), &q).map_err(to_status)?;
272
273        let (tx, rx) = tokio::sync::mpsc::channel(64);
274        tokio::spawn(async move {
275            for hit in hits {
276                if tx.send(Ok(hit_to_proto(hit))).await.is_err() {
277                    break;
278                }
279            }
280        });
281        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use std::sync::Arc;
288
289    use futures::StreamExt;
290    use objects::object::ChangeId;
291    use refs::operation_index::IndexedOperation;
292    use repo::{Repository, operation_dedup::OperationDedupStore};
293    use tempfile::TempDir;
294
295    use super::*;
296
297    fn make_op(seq: u64, ts_secs: i64, actor: &str, verb: &str) -> IndexedOperation {
298        IndexedOperation {
299            seq,
300            timestamp_secs: ts_secs,
301            verb: verb.to_string(),
302            actor_email: actor.to_string(),
303            operation_id: None,
304            thread: Some("main".into()),
305            symbols: vec!["src/lib.rs:foo".into()],
306            signal_kinds: vec![],
307            change_id: Some(ChangeId::from_bytes([1; 16])),
308        }
309    }
310
311    fn fresh_service() -> (TempDir, LocalOperationLogQueryService) {
312        let temp = TempDir::new().unwrap();
313        let repo = Repository::init_default(temp.path()).unwrap();
314        let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
315        let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
316        let svc = LocalOperationLogQueryService::new(inner);
317        (temp, svc)
318    }
319
320    fn write_op(svc: &LocalOperationLogQueryService, op: IndexedOperation) {
321        let index = OperationLogIndex::new(svc.inner.repo().heddle_dir());
322        index.record(op).unwrap();
323    }
324
325    fn write_oplog_record(svc: &LocalOperationLogQueryService, op: OpRecord) {
326        let log = OpLog::new_unattributed(svc.inner.repo().heddle_dir());
327        log.record_batch(vec![op]).unwrap();
328    }
329
330    #[tokio::test]
331    #[serial_test::serial(process_global)]
332    async fn query_returns_records_within_window() {
333        let (_t, svc) = fresh_service();
334        write_op(
335            &svc,
336            make_op(1, 1_700_000_000, "alice@example.com", "snapshot"),
337        );
338        write_op(
339            &svc,
340            make_op(2, 1_700_000_060, "bob@example.com", "snapshot"),
341        );
342        write_op(
343            &svc,
344            make_op(3, 1_700_000_120, "carol@example.com", "snapshot"),
345        );
346
347        let req = QueryOperationsRequest {
348            actor: "alice@example.com".into(),
349            include_checkpoints: true,
350            ..Default::default()
351        };
352        let resp = svc
353            .query_operations(Request::new(req))
354            .await
355            .unwrap()
356            .into_inner();
357        assert_eq!(resp.hits.len(), 1);
358        assert_eq!(resp.hits[0].seq, 1);
359        assert_eq!(resp.hits[0].actor_email, "alice@example.com");
360    }
361
362    #[tokio::test]
363    #[serial_test::serial(process_global)]
364    async fn query_excludes_checkpoints_by_default_when_verbs_unset() {
365        let (_t, svc) = fresh_service();
366        write_op(
367            &svc,
368            make_op(1, 1_700_000_000, "alice@example.com", "checkpoint"),
369        );
370        write_op(
371            &svc,
372            make_op(2, 1_700_000_060, "alice@example.com", "snapshot"),
373        );
374
375        let req = QueryOperationsRequest {
376            include_checkpoints: false,
377            ..Default::default()
378        };
379        let resp = svc
380            .query_operations(Request::new(req))
381            .await
382            .unwrap()
383            .into_inner();
384        assert_eq!(resp.hits.len(), 1);
385        assert_eq!(resp.hits[0].verb, "snapshot");
386    }
387
388    #[tokio::test]
389    #[serial_test::serial(process_global)]
390    async fn default_query_includes_newer_non_checkpoint_verbs() {
391        // Non-vacuous for cid 3330304663: `transaction_commit` was missing from
392        // the old hand-maintained default list, so it was silently dropped from
393        // the default (non-checkpoint) view. Now the default is derived from the
394        // oplog catalog, so it surfaces.
395        let (_t, svc) = fresh_service();
396        write_oplog_record(
397            &svc,
398            OpRecord::TransactionCommit {
399                transaction_id: "tx-1".into(),
400                op_count: 3,
401            },
402        );
403
404        let req = QueryOperationsRequest {
405            include_checkpoints: false,
406            ..Default::default()
407        };
408        let resp = svc
409            .query_operations(Request::new(req))
410            .await
411            .unwrap()
412            .into_inner();
413
414        assert_eq!(
415            resp.hits.len(),
416            1,
417            "newer non-checkpoint verb must not be dropped"
418        );
419        assert_eq!(resp.hits[0].verb, "transaction_commit");
420    }
421
422    #[tokio::test]
423    #[serial_test::serial(process_global)]
424    async fn query_reads_live_oplog_when_index_is_empty() {
425        let (_t, svc) = fresh_service();
426        let state = ChangeId::from_bytes([2; 16]);
427        write_oplog_record(
428            &svc,
429            OpRecord::Checkpoint {
430                parent: None,
431                state,
432                thread: Some("main".into()),
433            },
434        );
435
436        let req = QueryOperationsRequest {
437            include_checkpoints: true,
438            ..Default::default()
439        };
440        let resp = svc
441            .query_operations(Request::new(req))
442            .await
443            .unwrap()
444            .into_inner();
445
446        assert_eq!(resp.hits.len(), 1);
447        assert_eq!(resp.hits[0].verb, "checkpoint");
448        assert_eq!(resp.hits[0].thread, "main");
449        assert_eq!(resp.hits[0].change_id, state.as_bytes().to_vec());
450    }
451
452    #[tokio::test]
453    #[serial_test::serial(process_global)]
454    async fn stream_operations_yields_all_hits() {
455        let (_t, svc) = fresh_service();
456        for i in 0..5u64 {
457            write_op(
458                &svc,
459                make_op(
460                    i,
461                    1_700_000_000 + (i as i64) * 60,
462                    "alice@example.com",
463                    "snapshot",
464                ),
465            );
466        }
467
468        let req = StreamOperationsRequest {
469            repo_path: String::new(),
470            query: Some(QueryOperationsRequest {
471                include_checkpoints: true,
472                ..Default::default()
473            }),
474        };
475        let resp = svc.stream_operations(Request::new(req)).await.unwrap();
476
477        let mut stream = resp.into_inner();
478        let mut collected = Vec::new();
479        while let Some(item) = stream.next().await {
480            collected.push(item.unwrap());
481        }
482        assert_eq!(collected.len(), 5);
483    }
484}