1use std::{collections::BTreeMap, path::Path, pin::Pin};
10
11use chrono::TimeZone;
12use futures::Stream;
13use grpc::heddle::v1::{
14 OperationHit, QueryOperationsRequest, QueryOperationsResponse, StreamOperationsRequest,
15 operation_log_query_service_server::OperationLogQueryService,
16};
17use objects::{error::Result as HeddleResult, object::ChangeId};
18use oplog::{OpEntry, OpLog, OpLogBackend, OpRecord};
19use refs::operation_index::{IndexedOperation, OperationLogIndex, OperationLogQuery};
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::{Request, Response, Status};
22
23use super::{GrpcLocalService, to_status};
24
25#[derive(Clone)]
26pub struct LocalOperationLogQueryService {
27 inner: GrpcLocalService,
28}
29
30impl LocalOperationLogQueryService {
31 pub fn new(inner: GrpcLocalService) -> Self {
32 Self { inner }
33 }
34}
35
36fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
40 if secs == 0 {
41 return None;
42 }
43 chrono::Utc.timestamp_opt(secs, 0).single()
44}
45
46const OPLOG_FALLBACK_SCAN_WINDOW: usize = 100_000;
50
51fn build_query(req: &QueryOperationsRequest) -> OperationLogQuery {
52 let mut q = OperationLogQuery {
53 actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
54 symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
55 signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
56 thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
57 verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
58 since: parse_unix_secs(req.since_secs),
59 until: parse_unix_secs(req.until_secs),
60 limit: (req.limit > 0).then_some(req.limit as usize),
61 };
62 if !req.include_checkpoints && q.verbs.is_none() {
63 q.verbs = Some(
68 OpRecord::verbs(false)
69 .iter()
70 .map(|s| s.to_string())
71 .collect(),
72 );
73 }
74 q
75}
76
77fn hit_to_proto(hit: IndexedOperation) -> OperationHit {
78 OperationHit {
79 seq: hit.seq,
80 timestamp_secs: hit.timestamp_secs,
81 verb: hit.verb,
82 actor_email: hit.actor_email,
83 operation_id: hit.operation_id.map(|o| o.to_string()).unwrap_or_default(),
84 thread: hit.thread.unwrap_or_default(),
85 symbols: hit.symbols,
86 signal_kinds: hit.signal_kinds,
87 change_id: hit
88 .change_id
89 .map(|c| c.as_bytes().to_vec())
90 .unwrap_or_default(),
91 }
92}
93
94fn query_combined(
95 heddle_dir: &Path,
96 query: &OperationLogQuery,
97) -> HeddleResult<Vec<IndexedOperation>> {
98 let index = OperationLogIndex::new(heddle_dir);
99 let mut unbounded = query.clone();
100 unbounded.limit = None;
101
102 let mut by_seq = BTreeMap::new();
103 for hit in index.query(&unbounded)? {
104 by_seq.insert(hit.seq, hit);
105 }
106
107 if unbounded.symbol.is_none() && unbounded.signal_kind.is_none() {
108 for hit in query_oplog_fallback(heddle_dir, &unbounded)? {
109 by_seq.entry(hit.seq).or_insert(hit);
110 }
111 }
112
113 let mut hits: Vec<_> = by_seq.into_values().collect();
114 hits.sort_by_key(|hit| hit.seq);
115 if let Some(limit) = query.limit {
116 hits.truncate(limit);
117 }
118 Ok(hits)
119}
120
121fn query_oplog_fallback(
122 heddle_dir: &Path,
123 query: &OperationLogQuery,
124) -> HeddleResult<Vec<IndexedOperation>> {
125 let log = OpLog::new_unattributed(heddle_dir);
126 let mut entries = log.recent(OPLOG_FALLBACK_SCAN_WINDOW)?;
127 entries.reverse();
128 let mut hits = Vec::new();
129 for entry in entries {
130 let hit = indexed_from_oplog_entry(&entry);
131 if indexed_operation_matches(&hit, query) {
132 hits.push(hit);
133 }
134 }
135 Ok(hits)
136}
137
138fn indexed_from_oplog_entry(entry: &OpEntry) -> IndexedOperation {
139 IndexedOperation {
140 seq: entry.id,
141 timestamp_secs: entry.timestamp.timestamp(),
142 verb: entry.operation.verb().to_string(),
143 actor_email: entry.actor.email.clone(),
144 operation_id: entry.operation_id,
145 thread: thread_for(&entry.operation),
146 symbols: Vec::new(),
147 signal_kinds: Vec::new(),
148 change_id: primary_change_id(&entry.operation),
149 }
150}
151
152fn indexed_operation_matches(hit: &IndexedOperation, query: &OperationLogQuery) -> bool {
153 if let Some(actor) = &query.actor
154 && &hit.actor_email != actor
155 {
156 return false;
157 }
158 if let Some(symbol) = &query.symbol
159 && !hit.symbols.iter().any(|candidate| candidate == symbol)
160 {
161 return false;
162 }
163 if let Some(kind) = &query.signal_kind
164 && !hit.signal_kinds.iter().any(|candidate| candidate == kind)
165 {
166 return false;
167 }
168 if let Some(thread) = &query.thread
169 && hit.thread.as_deref() != Some(thread.as_str())
170 {
171 return false;
172 }
173 if let Some(verbs) = &query.verbs
174 && !verbs.iter().any(|verb| verb == &hit.verb)
175 {
176 return false;
177 }
178 let timestamp = hit.timestamp();
179 if let Some(start) = query.since
180 && timestamp < start
181 {
182 return false;
183 }
184 if let Some(end) = query.until
185 && timestamp > end
186 {
187 return false;
188 }
189 true
190}
191
192fn thread_for(op: &OpRecord) -> Option<String> {
193 match op {
194 OpRecord::Snapshot { thread, .. } => thread.clone(),
195 OpRecord::ThreadCreate { name, .. } => Some(name.clone()),
196 OpRecord::ThreadDelete { name, .. } => Some(name.clone()),
197 OpRecord::ThreadUpdate { name, .. } => Some(name.clone()),
198 OpRecord::MarkerCreate { name, .. } => Some(name.clone()),
199 OpRecord::MarkerDelete { name, .. } => Some(name.clone()),
200 OpRecord::Checkpoint { thread, .. } => thread.clone(),
201 OpRecord::EphemeralThreadCollapse { thread, .. } => Some(thread.clone()),
202 OpRecord::FastForward { target_thread, .. } => Some(target_thread.clone()),
203 OpRecord::GitCheckpoint { branch, .. } => Some(branch.clone()),
204 OpRecord::RemoteThreadUpdate { thread, .. }
205 | OpRecord::RemoteThreadDelete { thread, .. } => Some(thread.clone()),
206 OpRecord::Goto { .. }
207 | OpRecord::Fork { .. }
208 | OpRecord::Collapse { .. }
209 | OpRecord::TransactionAbort { .. }
210 | OpRecord::TransactionCommit { .. }
211 | OpRecord::ConflictResolved { .. }
212 | OpRecord::Redact { .. }
213 | OpRecord::UndoRecoveryUpdate { .. }
214 | OpRecord::StateVisibilitySet { .. }
215 | OpRecord::StateVisibilityPromote { .. }
216 | OpRecord::Purge { .. } => None,
217 }
218}
219
220fn primary_change_id(op: &OpRecord) -> Option<ChangeId> {
221 match op {
222 OpRecord::Snapshot { new_state, .. } => Some(*new_state),
223 OpRecord::Goto { target, .. } => Some(*target),
224 OpRecord::ThreadCreate { state, .. } => Some(*state),
225 OpRecord::ThreadDelete { state, .. } => Some(*state),
226 OpRecord::ThreadUpdate { new_state, .. } => Some(*new_state),
227 OpRecord::Fork { new_state, .. } => Some(*new_state),
228 OpRecord::Collapse { result, .. } => Some(*result),
229 OpRecord::MarkerCreate { state, .. } => Some(*state),
230 OpRecord::MarkerDelete { state, .. } => Some(*state),
231 OpRecord::Checkpoint { state, .. } => Some(*state),
232 OpRecord::GitCheckpoint { state, .. } => Some(*state),
233 OpRecord::EphemeralThreadCollapse { final_state, .. } => Some(*final_state),
234 OpRecord::Redact { state, .. } => Some(*state),
235 OpRecord::StateVisibilitySet { state, .. }
236 | OpRecord::StateVisibilityPromote { state, .. } => Some(*state),
237 OpRecord::RemoteThreadUpdate { state, .. } | OpRecord::RemoteThreadDelete { state, .. } => {
238 Some(*state)
239 }
240 OpRecord::UndoRecoveryUpdate { state } => Some(*state),
241 OpRecord::TransactionAbort { .. }
242 | OpRecord::TransactionCommit { .. }
243 | OpRecord::ConflictResolved { .. }
244 | OpRecord::Purge { .. }
245 | OpRecord::FastForward { .. } => None,
246 }
247}
248
249#[tonic::async_trait]
250impl OperationLogQueryService for LocalOperationLogQueryService {
251 type StreamOperationsStream = Pin<Box<dyn Stream<Item = Result<OperationHit, Status>> + Send>>;
252
253 async fn query_operations(
254 &self,
255 request: Request<QueryOperationsRequest>,
256 ) -> Result<Response<QueryOperationsResponse>, Status> {
257 let req = request.into_inner();
258 let q = build_query(&req);
259 let hits = query_combined(self.inner.repo().heddle_dir(), &q).map_err(to_status)?;
260 let proto_hits = hits.into_iter().map(hit_to_proto).collect();
261 Ok(Response::new(QueryOperationsResponse { hits: proto_hits }))
262 }
263
264 async fn stream_operations(
265 &self,
266 request: Request<StreamOperationsRequest>,
267 ) -> Result<Response<Self::StreamOperationsStream>, Status> {
268 let req = request.into_inner();
269 let inner_req = req.query.unwrap_or_default();
270 let q = build_query(&inner_req);
271 let hits = query_combined(self.inner.repo().heddle_dir(), &q).map_err(to_status)?;
272
273 let (tx, rx) = tokio::sync::mpsc::channel(64);
274 tokio::spawn(async move {
275 for hit in hits {
276 if tx.send(Ok(hit_to_proto(hit))).await.is_err() {
277 break;
278 }
279 }
280 });
281 Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::sync::Arc;
288
289 use futures::StreamExt;
290 use objects::object::ChangeId;
291 use refs::operation_index::IndexedOperation;
292 use repo::{Repository, operation_dedup::OperationDedupStore};
293 use tempfile::TempDir;
294
295 use super::*;
296
297 fn make_op(seq: u64, ts_secs: i64, actor: &str, verb: &str) -> IndexedOperation {
298 IndexedOperation {
299 seq,
300 timestamp_secs: ts_secs,
301 verb: verb.to_string(),
302 actor_email: actor.to_string(),
303 operation_id: None,
304 thread: Some("main".into()),
305 symbols: vec!["src/lib.rs:foo".into()],
306 signal_kinds: vec![],
307 change_id: Some(ChangeId::from_bytes([1; 16])),
308 }
309 }
310
311 fn fresh_service() -> (TempDir, LocalOperationLogQueryService) {
312 let temp = TempDir::new().unwrap();
313 let repo = Repository::init_default(temp.path()).unwrap();
314 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
315 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
316 let svc = LocalOperationLogQueryService::new(inner);
317 (temp, svc)
318 }
319
320 fn write_op(svc: &LocalOperationLogQueryService, op: IndexedOperation) {
321 let index = OperationLogIndex::new(svc.inner.repo().heddle_dir());
322 index.record(op).unwrap();
323 }
324
325 fn write_oplog_record(svc: &LocalOperationLogQueryService, op: OpRecord) {
326 let log = OpLog::new_unattributed(svc.inner.repo().heddle_dir());
327 log.record_batch(vec![op]).unwrap();
328 }
329
330 #[tokio::test]
331 #[serial_test::serial(process_global)]
332 async fn query_returns_records_within_window() {
333 let (_t, svc) = fresh_service();
334 write_op(
335 &svc,
336 make_op(1, 1_700_000_000, "alice@example.com", "snapshot"),
337 );
338 write_op(
339 &svc,
340 make_op(2, 1_700_000_060, "bob@example.com", "snapshot"),
341 );
342 write_op(
343 &svc,
344 make_op(3, 1_700_000_120, "carol@example.com", "snapshot"),
345 );
346
347 let req = QueryOperationsRequest {
348 actor: "alice@example.com".into(),
349 include_checkpoints: true,
350 ..Default::default()
351 };
352 let resp = svc
353 .query_operations(Request::new(req))
354 .await
355 .unwrap()
356 .into_inner();
357 assert_eq!(resp.hits.len(), 1);
358 assert_eq!(resp.hits[0].seq, 1);
359 assert_eq!(resp.hits[0].actor_email, "alice@example.com");
360 }
361
362 #[tokio::test]
363 #[serial_test::serial(process_global)]
364 async fn query_excludes_checkpoints_by_default_when_verbs_unset() {
365 let (_t, svc) = fresh_service();
366 write_op(
367 &svc,
368 make_op(1, 1_700_000_000, "alice@example.com", "checkpoint"),
369 );
370 write_op(
371 &svc,
372 make_op(2, 1_700_000_060, "alice@example.com", "snapshot"),
373 );
374
375 let req = QueryOperationsRequest {
376 include_checkpoints: false,
377 ..Default::default()
378 };
379 let resp = svc
380 .query_operations(Request::new(req))
381 .await
382 .unwrap()
383 .into_inner();
384 assert_eq!(resp.hits.len(), 1);
385 assert_eq!(resp.hits[0].verb, "snapshot");
386 }
387
388 #[tokio::test]
389 #[serial_test::serial(process_global)]
390 async fn default_query_includes_newer_non_checkpoint_verbs() {
391 let (_t, svc) = fresh_service();
396 write_oplog_record(
397 &svc,
398 OpRecord::TransactionCommit {
399 transaction_id: "tx-1".into(),
400 op_count: 3,
401 },
402 );
403
404 let req = QueryOperationsRequest {
405 include_checkpoints: false,
406 ..Default::default()
407 };
408 let resp = svc
409 .query_operations(Request::new(req))
410 .await
411 .unwrap()
412 .into_inner();
413
414 assert_eq!(
415 resp.hits.len(),
416 1,
417 "newer non-checkpoint verb must not be dropped"
418 );
419 assert_eq!(resp.hits[0].verb, "transaction_commit");
420 }
421
422 #[tokio::test]
423 #[serial_test::serial(process_global)]
424 async fn query_reads_live_oplog_when_index_is_empty() {
425 let (_t, svc) = fresh_service();
426 let state = ChangeId::from_bytes([2; 16]);
427 write_oplog_record(
428 &svc,
429 OpRecord::Checkpoint {
430 parent: None,
431 state,
432 thread: Some("main".into()),
433 },
434 );
435
436 let req = QueryOperationsRequest {
437 include_checkpoints: true,
438 ..Default::default()
439 };
440 let resp = svc
441 .query_operations(Request::new(req))
442 .await
443 .unwrap()
444 .into_inner();
445
446 assert_eq!(resp.hits.len(), 1);
447 assert_eq!(resp.hits[0].verb, "checkpoint");
448 assert_eq!(resp.hits[0].thread, "main");
449 assert_eq!(resp.hits[0].change_id, state.as_bytes().to_vec());
450 }
451
452 #[tokio::test]
453 #[serial_test::serial(process_global)]
454 async fn stream_operations_yields_all_hits() {
455 let (_t, svc) = fresh_service();
456 for i in 0..5u64 {
457 write_op(
458 &svc,
459 make_op(
460 i,
461 1_700_000_000 + (i as i64) * 60,
462 "alice@example.com",
463 "snapshot",
464 ),
465 );
466 }
467
468 let req = StreamOperationsRequest {
469 repo_path: String::new(),
470 query: Some(QueryOperationsRequest {
471 include_checkpoints: true,
472 ..Default::default()
473 }),
474 };
475 let resp = svc.stream_operations(Request::new(req)).await.unwrap();
476
477 let mut stream = resp.into_inner();
478 let mut collected = Vec::new();
479 while let Some(item) = stream.next().await {
480 collected.push(item.unwrap());
481 }
482 assert_eq!(collected.len(), 5);
483 }
484}