Skip to main content

daemon/grpc_local_impl/
operation_log_query.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local gRPC service for the W2 `OperationLogQueryService`.
3//!
4//! Read-only wrapper around [`refs::operation_index::OperationLogIndex`].
5//! Translates protobuf request/response shapes to/from the index's native
6//! [`OperationLogQuery`] / [`IndexedOperation`] types. No state changes,
7//! no idempotency wrapper.
8
9use std::pin::Pin;
10
11use chrono::TimeZone;
12use futures::Stream;
13use grpc::heddle::v1::{
14    OperationHit, QueryOperationsRequest, QueryOperationsResponse, StreamOperationsRequest,
15    operation_log_query_service_server::OperationLogQueryService,
16};
17use refs::operation_index::{IndexedOperation, OperationLogIndex, OperationLogQuery};
18use tokio_stream::wrappers::ReceiverStream;
19use tonic::{Request, Response, Status};
20
21use super::{GrpcLocalService, to_status};
22
23#[derive(Clone)]
24pub struct LocalOperationLogQueryService {
25    inner: GrpcLocalService,
26}
27
28impl LocalOperationLogQueryService {
29    pub fn new(inner: GrpcLocalService) -> Self {
30        Self { inner }
31    }
32}
33
34/// Convert a unix-epoch-seconds field from the proto. `0` means "unset"
35/// because proto3 scalars don't distinguish presence; any other value is
36/// passed to [`chrono::Utc.timestamp_opt`] and discarded if out of range.
37fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
38    if secs == 0 {
39        return None;
40    }
41    chrono::Utc.timestamp_opt(secs, 0).single()
42}
43
44/// Default verbs included when `include_checkpoints` is `false` and the
45/// caller didn't supply an explicit verb filter. Mirrors the snake-case
46/// names used by the index, which in turn mirror the `OpRecord` variants
47/// minus `Checkpoint`.
48const DEFAULT_NON_CHECKPOINT_VERBS: &[&str] = &[
49    "snapshot",
50    "goto",
51    "thread_create",
52    "thread_delete",
53    "thread_update",
54    "fork",
55    "collapse",
56    "marker_create",
57    "marker_delete",
58    "transaction_abort",
59    "ephemeral_thread_collapse",
60    "conflict_resolved",
61];
62
63fn build_query(req: &QueryOperationsRequest) -> OperationLogQuery {
64    let mut q = OperationLogQuery {
65        actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
66        symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
67        signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
68        thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
69        verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
70        since: parse_unix_secs(req.since_secs),
71        until: parse_unix_secs(req.until_secs),
72        limit: (req.limit > 0).then_some(req.limit as usize),
73    };
74    if !req.include_checkpoints && q.verbs.is_none() {
75        q.verbs = Some(
76            DEFAULT_NON_CHECKPOINT_VERBS
77                .iter()
78                .map(|s| (*s).to_string())
79                .collect(),
80        );
81    }
82    q
83}
84
85fn hit_to_proto(hit: IndexedOperation) -> OperationHit {
86    OperationHit {
87        seq: hit.seq,
88        timestamp_secs: hit.timestamp_secs,
89        verb: hit.verb,
90        actor_email: hit.actor_email,
91        operation_id: hit.operation_id.map(|o| o.to_string()).unwrap_or_default(),
92        thread: hit.thread.unwrap_or_default(),
93        symbols: hit.symbols,
94        signal_kinds: hit.signal_kinds,
95        change_id: hit
96            .change_id
97            .map(|c| c.as_bytes().to_vec())
98            .unwrap_or_default(),
99    }
100}
101
102#[tonic::async_trait]
103impl OperationLogQueryService for LocalOperationLogQueryService {
104    type StreamOperationsStream = Pin<Box<dyn Stream<Item = Result<OperationHit, Status>> + Send>>;
105
106    async fn query_operations(
107        &self,
108        request: Request<QueryOperationsRequest>,
109    ) -> Result<Response<QueryOperationsResponse>, Status> {
110        let req = request.into_inner();
111        let q = build_query(&req);
112        let index = OperationLogIndex::new(self.inner.repo().heddle_dir());
113        let hits = index.query(&q).map_err(to_status)?;
114        let proto_hits = hits.into_iter().map(hit_to_proto).collect();
115        Ok(Response::new(QueryOperationsResponse { hits: proto_hits }))
116    }
117
118    async fn stream_operations(
119        &self,
120        request: Request<StreamOperationsRequest>,
121    ) -> Result<Response<Self::StreamOperationsStream>, Status> {
122        let req = request.into_inner();
123        let inner_req = req.query.unwrap_or_default();
124        let q = build_query(&inner_req);
125        let index = OperationLogIndex::new(self.inner.repo().heddle_dir());
126        let hits = index.query(&q).map_err(to_status)?;
127
128        let (tx, rx) = tokio::sync::mpsc::channel(64);
129        tokio::spawn(async move {
130            for hit in hits {
131                if tx.send(Ok(hit_to_proto(hit))).await.is_err() {
132                    break;
133                }
134            }
135        });
136        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use std::sync::Arc;
143
144    use futures::StreamExt;
145    use objects::object::ChangeId;
146    use refs::operation_index::IndexedOperation;
147    use repo::{Repository, operation_dedup::OperationDedupStore};
148    use tempfile::TempDir;
149
150    use super::*;
151
152    fn make_op(seq: u64, ts_secs: i64, actor: &str, verb: &str) -> IndexedOperation {
153        IndexedOperation {
154            seq,
155            timestamp_secs: ts_secs,
156            verb: verb.to_string(),
157            actor_email: actor.to_string(),
158            operation_id: None,
159            thread: Some("main".into()),
160            symbols: vec!["src/lib.rs:foo".into()],
161            signal_kinds: vec![],
162            change_id: Some(ChangeId::from_bytes([1; 16])),
163        }
164    }
165
166    fn fresh_service() -> (TempDir, LocalOperationLogQueryService) {
167        let temp = TempDir::new().unwrap();
168        let repo = Repository::init_default(temp.path()).unwrap();
169        let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
170        let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
171        let svc = LocalOperationLogQueryService::new(inner);
172        (temp, svc)
173    }
174
175    fn write_op(svc: &LocalOperationLogQueryService, op: IndexedOperation) {
176        let index = OperationLogIndex::new(svc.inner.repo().heddle_dir());
177        index.record(op).unwrap();
178    }
179
180    #[tokio::test]
181    async fn query_returns_records_within_window() {
182        let (_t, svc) = fresh_service();
183        write_op(
184            &svc,
185            make_op(1, 1_700_000_000, "alice@example.com", "snapshot"),
186        );
187        write_op(
188            &svc,
189            make_op(2, 1_700_000_060, "bob@example.com", "snapshot"),
190        );
191        write_op(
192            &svc,
193            make_op(3, 1_700_000_120, "carol@example.com", "snapshot"),
194        );
195
196        let req = QueryOperationsRequest {
197            actor: "alice@example.com".into(),
198            include_checkpoints: true,
199            ..Default::default()
200        };
201        let resp = svc
202            .query_operations(Request::new(req))
203            .await
204            .unwrap()
205            .into_inner();
206        assert_eq!(resp.hits.len(), 1);
207        assert_eq!(resp.hits[0].seq, 1);
208        assert_eq!(resp.hits[0].actor_email, "alice@example.com");
209    }
210
211    #[tokio::test]
212    async fn query_excludes_checkpoints_by_default_when_verbs_unset() {
213        let (_t, svc) = fresh_service();
214        write_op(
215            &svc,
216            make_op(1, 1_700_000_000, "alice@example.com", "checkpoint"),
217        );
218        write_op(
219            &svc,
220            make_op(2, 1_700_000_060, "alice@example.com", "snapshot"),
221        );
222
223        let req = QueryOperationsRequest {
224            include_checkpoints: false,
225            ..Default::default()
226        };
227        let resp = svc
228            .query_operations(Request::new(req))
229            .await
230            .unwrap()
231            .into_inner();
232        assert_eq!(resp.hits.len(), 1);
233        assert_eq!(resp.hits[0].verb, "snapshot");
234    }
235
236    #[tokio::test]
237    async fn stream_operations_yields_all_hits() {
238        let (_t, svc) = fresh_service();
239        for i in 0..5u64 {
240            write_op(
241                &svc,
242                make_op(
243                    i,
244                    1_700_000_000 + (i as i64) * 60,
245                    "alice@example.com",
246                    "snapshot",
247                ),
248            );
249        }
250
251        let req = StreamOperationsRequest {
252            repo_path: String::new(),
253            query: Some(QueryOperationsRequest {
254                include_checkpoints: true,
255                ..Default::default()
256            }),
257        };
258        let resp = svc.stream_operations(Request::new(req)).await.unwrap();
259
260        let mut stream = resp.into_inner();
261        let mut collected = Vec::new();
262        while let Some(item) = stream.next().await {
263            collected.push(item.unwrap());
264        }
265        assert_eq!(collected.len(), 5);
266    }
267}