1use std::pin::Pin;
10
11use chrono::TimeZone;
12use futures::Stream;
13use grpc::heddle::v1::{
14 OperationHit, QueryOperationsRequest, QueryOperationsResponse, StreamOperationsRequest,
15 operation_log_query_service_server::OperationLogQueryService,
16};
17use refs::operation_index::{IndexedOperation, OperationLogIndex, OperationLogQuery};
18use tokio_stream::wrappers::ReceiverStream;
19use tonic::{Request, Response, Status};
20
21use super::{GrpcLocalService, to_status};
22
23#[derive(Clone)]
24pub struct LocalOperationLogQueryService {
25 inner: GrpcLocalService,
26}
27
28impl LocalOperationLogQueryService {
29 pub fn new(inner: GrpcLocalService) -> Self {
30 Self { inner }
31 }
32}
33
34fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
38 if secs == 0 {
39 return None;
40 }
41 chrono::Utc.timestamp_opt(secs, 0).single()
42}
43
44const DEFAULT_NON_CHECKPOINT_VERBS: &[&str] = &[
49 "snapshot",
50 "goto",
51 "thread_create",
52 "thread_delete",
53 "thread_update",
54 "fork",
55 "collapse",
56 "marker_create",
57 "marker_delete",
58 "transaction_abort",
59 "ephemeral_thread_collapse",
60 "conflict_resolved",
61];
62
63fn build_query(req: &QueryOperationsRequest) -> OperationLogQuery {
64 let mut q = OperationLogQuery {
65 actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
66 symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
67 signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
68 thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
69 verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
70 since: parse_unix_secs(req.since_secs),
71 until: parse_unix_secs(req.until_secs),
72 limit: (req.limit > 0).then_some(req.limit as usize),
73 };
74 if !req.include_checkpoints && q.verbs.is_none() {
75 q.verbs = Some(
76 DEFAULT_NON_CHECKPOINT_VERBS
77 .iter()
78 .map(|s| (*s).to_string())
79 .collect(),
80 );
81 }
82 q
83}
84
85fn hit_to_proto(hit: IndexedOperation) -> OperationHit {
86 OperationHit {
87 seq: hit.seq,
88 timestamp_secs: hit.timestamp_secs,
89 verb: hit.verb,
90 actor_email: hit.actor_email,
91 operation_id: hit.operation_id.map(|o| o.to_string()).unwrap_or_default(),
92 thread: hit.thread.unwrap_or_default(),
93 symbols: hit.symbols,
94 signal_kinds: hit.signal_kinds,
95 change_id: hit
96 .change_id
97 .map(|c| c.as_bytes().to_vec())
98 .unwrap_or_default(),
99 }
100}
101
102#[tonic::async_trait]
103impl OperationLogQueryService for LocalOperationLogQueryService {
104 type StreamOperationsStream = Pin<Box<dyn Stream<Item = Result<OperationHit, Status>> + Send>>;
105
106 async fn query_operations(
107 &self,
108 request: Request<QueryOperationsRequest>,
109 ) -> Result<Response<QueryOperationsResponse>, Status> {
110 let req = request.into_inner();
111 let q = build_query(&req);
112 let index = OperationLogIndex::new(self.inner.repo().heddle_dir());
113 let hits = index.query(&q).map_err(to_status)?;
114 let proto_hits = hits.into_iter().map(hit_to_proto).collect();
115 Ok(Response::new(QueryOperationsResponse { hits: proto_hits }))
116 }
117
118 async fn stream_operations(
119 &self,
120 request: Request<StreamOperationsRequest>,
121 ) -> Result<Response<Self::StreamOperationsStream>, Status> {
122 let req = request.into_inner();
123 let inner_req = req.query.unwrap_or_default();
124 let q = build_query(&inner_req);
125 let index = OperationLogIndex::new(self.inner.repo().heddle_dir());
126 let hits = index.query(&q).map_err(to_status)?;
127
128 let (tx, rx) = tokio::sync::mpsc::channel(64);
129 tokio::spawn(async move {
130 for hit in hits {
131 if tx.send(Ok(hit_to_proto(hit))).await.is_err() {
132 break;
133 }
134 }
135 });
136 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use std::sync::Arc;
143
144 use futures::StreamExt;
145 use objects::object::ChangeId;
146 use refs::operation_index::IndexedOperation;
147 use repo::{Repository, operation_dedup::OperationDedupStore};
148 use tempfile::TempDir;
149
150 use super::*;
151
152 fn make_op(seq: u64, ts_secs: i64, actor: &str, verb: &str) -> IndexedOperation {
153 IndexedOperation {
154 seq,
155 timestamp_secs: ts_secs,
156 verb: verb.to_string(),
157 actor_email: actor.to_string(),
158 operation_id: None,
159 thread: Some("main".into()),
160 symbols: vec!["src/lib.rs:foo".into()],
161 signal_kinds: vec![],
162 change_id: Some(ChangeId::from_bytes([1; 16])),
163 }
164 }
165
166 fn fresh_service() -> (TempDir, LocalOperationLogQueryService) {
167 let temp = TempDir::new().unwrap();
168 let repo = Repository::init_default(temp.path()).unwrap();
169 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
170 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
171 let svc = LocalOperationLogQueryService::new(inner);
172 (temp, svc)
173 }
174
175 fn write_op(svc: &LocalOperationLogQueryService, op: IndexedOperation) {
176 let index = OperationLogIndex::new(svc.inner.repo().heddle_dir());
177 index.record(op).unwrap();
178 }
179
180 #[tokio::test]
181 async fn query_returns_records_within_window() {
182 let (_t, svc) = fresh_service();
183 write_op(
184 &svc,
185 make_op(1, 1_700_000_000, "alice@example.com", "snapshot"),
186 );
187 write_op(
188 &svc,
189 make_op(2, 1_700_000_060, "bob@example.com", "snapshot"),
190 );
191 write_op(
192 &svc,
193 make_op(3, 1_700_000_120, "carol@example.com", "snapshot"),
194 );
195
196 let req = QueryOperationsRequest {
197 actor: "alice@example.com".into(),
198 include_checkpoints: true,
199 ..Default::default()
200 };
201 let resp = svc
202 .query_operations(Request::new(req))
203 .await
204 .unwrap()
205 .into_inner();
206 assert_eq!(resp.hits.len(), 1);
207 assert_eq!(resp.hits[0].seq, 1);
208 assert_eq!(resp.hits[0].actor_email, "alice@example.com");
209 }
210
211 #[tokio::test]
212 async fn query_excludes_checkpoints_by_default_when_verbs_unset() {
213 let (_t, svc) = fresh_service();
214 write_op(
215 &svc,
216 make_op(1, 1_700_000_000, "alice@example.com", "checkpoint"),
217 );
218 write_op(
219 &svc,
220 make_op(2, 1_700_000_060, "alice@example.com", "snapshot"),
221 );
222
223 let req = QueryOperationsRequest {
224 include_checkpoints: false,
225 ..Default::default()
226 };
227 let resp = svc
228 .query_operations(Request::new(req))
229 .await
230 .unwrap()
231 .into_inner();
232 assert_eq!(resp.hits.len(), 1);
233 assert_eq!(resp.hits[0].verb, "snapshot");
234 }
235
236 #[tokio::test]
237 async fn stream_operations_yields_all_hits() {
238 let (_t, svc) = fresh_service();
239 for i in 0..5u64 {
240 write_op(
241 &svc,
242 make_op(
243 i,
244 1_700_000_000 + (i as i64) * 60,
245 "alice@example.com",
246 "snapshot",
247 ),
248 );
249 }
250
251 let req = StreamOperationsRequest {
252 repo_path: String::new(),
253 query: Some(QueryOperationsRequest {
254 include_checkpoints: true,
255 ..Default::default()
256 }),
257 };
258 let resp = svc.stream_operations(Request::new(req)).await.unwrap();
259
260 let mut stream = resp.into_inner();
261 let mut collected = Vec::new();
262 while let Some(item) = stream.next().await {
263 collected.push(item.unwrap());
264 }
265 assert_eq!(collected.len(), 5);
266 }
267}