1use std::{collections::BTreeMap, path::Path};
5
6use chrono::TimeZone;
7use objects::{
8 error::Result,
9 object::{ChangeId, OperationId},
10};
11use oplog::{OpEntry, OpLog, OpLogBackend, OpRecord};
12use refs::refs::{IndexedOperation, OperationLogIndex, OperationLogQuery};
13use serde::Serialize;
14
15use crate::ExecutionContext;
16
17#[derive(Debug, Clone, Default, PartialEq, Eq)]
19pub struct QueryRequest {
20 pub actor: String,
21 pub symbol: String,
22 pub signal_kind: String,
23 pub thread: String,
24 pub verbs: Vec<String>,
25 pub since_secs: i64,
26 pub until_secs: i64,
27 pub limit: u32,
28 pub include_checkpoints: bool,
29}
30
31#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
32pub struct QueryReport {
33 pub output_kind: &'static str,
34 pub hits: Vec<QueryHit>,
35}
36
37#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
38pub struct QueryHit {
39 pub seq: u64,
40 pub timestamp_secs: i64,
41 pub verb: String,
42 pub actor_email: String,
43 pub operation_id: Option<String>,
44 pub thread: Option<String>,
45 pub symbols: Vec<String>,
46 pub signal_kinds: Vec<String>,
47 pub change_id: Option<String>,
48}
49
50const OPLOG_FALLBACK_SCAN_WINDOW: usize = 100_000;
54
55pub fn query(ctx: &ExecutionContext, req: QueryRequest) -> Result<QueryReport> {
56 let repo = ctx.require_repo()?;
57 let q = build_query(&req);
58 let hits = query_combined(repo.heddle_dir(), &q)?;
59 Ok(QueryReport {
60 output_kind: "query",
61 hits: hits.into_iter().map(hit_to_report).collect(),
62 })
63}
64
65fn build_query(req: &QueryRequest) -> OperationLogQuery {
66 let mut q = OperationLogQuery {
67 actor: (!req.actor.is_empty()).then(|| req.actor.clone()),
68 symbol: (!req.symbol.is_empty()).then(|| req.symbol.clone()),
69 signal_kind: (!req.signal_kind.is_empty()).then(|| req.signal_kind.clone()),
70 thread: (!req.thread.is_empty()).then(|| req.thread.clone()),
71 verbs: (!req.verbs.is_empty()).then(|| req.verbs.clone()),
72 since: parse_unix_secs(req.since_secs),
73 until: parse_unix_secs(req.until_secs),
74 limit: (req.limit > 0).then_some(req.limit as usize),
75 };
76 if !req.include_checkpoints && q.verbs.is_none() {
77 q.verbs = Some(
78 OpRecord::verbs(false)
79 .iter()
80 .map(|s| s.to_string())
81 .collect(),
82 );
83 }
84 q
85}
86
87fn parse_unix_secs(secs: i64) -> Option<chrono::DateTime<chrono::Utc>> {
88 if secs == 0 {
89 return None;
90 }
91 chrono::Utc.timestamp_opt(secs, 0).single()
92}
93
94fn query_combined(heddle_dir: &Path, query: &OperationLogQuery) -> Result<Vec<IndexedOperation>> {
95 let index = OperationLogIndex::new(heddle_dir);
96 let mut unbounded = query.clone();
97 unbounded.limit = None;
98
99 let mut by_seq = BTreeMap::new();
100 for hit in index.query(&unbounded)? {
101 by_seq.insert(hit.seq, hit);
102 }
103
104 if unbounded.symbol.is_none() && unbounded.signal_kind.is_none() {
105 for hit in query_oplog_fallback(heddle_dir, &unbounded)? {
106 by_seq.entry(hit.seq).or_insert(hit);
107 }
108 }
109
110 let mut hits: Vec<_> = by_seq.into_values().collect();
111 hits.sort_by_key(|hit| hit.seq);
112 if let Some(limit) = query.limit {
113 hits.truncate(limit);
114 }
115 Ok(hits)
116}
117
118fn query_oplog_fallback(
119 heddle_dir: &Path,
120 query: &OperationLogQuery,
121) -> Result<Vec<IndexedOperation>> {
122 let log = OpLog::new_unattributed(heddle_dir);
123 let mut entries = log.recent(OPLOG_FALLBACK_SCAN_WINDOW)?;
124 entries.reverse();
125 let mut hits = Vec::new();
126 for entry in entries {
127 let hit = indexed_from_oplog_entry(&entry);
128 if indexed_operation_matches(&hit, query) {
129 hits.push(hit);
130 }
131 }
132 Ok(hits)
133}
134
135fn indexed_from_oplog_entry(entry: &OpEntry) -> IndexedOperation {
136 IndexedOperation {
137 seq: entry.id,
138 timestamp_secs: entry.timestamp.timestamp(),
139 verb: entry.operation.verb().to_string(),
140 actor_email: entry.actor.email.clone(),
141 operation_id: entry.operation_id,
142 thread: thread_for(&entry.operation),
143 symbols: Vec::new(),
144 signal_kinds: Vec::new(),
145 change_id: primary_change_id(&entry.operation),
146 }
147}
148
149fn indexed_operation_matches(hit: &IndexedOperation, query: &OperationLogQuery) -> bool {
150 if let Some(actor) = &query.actor
151 && &hit.actor_email != actor
152 {
153 return false;
154 }
155 if let Some(symbol) = &query.symbol
156 && !hit.symbols.iter().any(|candidate| candidate == symbol)
157 {
158 return false;
159 }
160 if let Some(kind) = &query.signal_kind
161 && !hit.signal_kinds.iter().any(|candidate| candidate == kind)
162 {
163 return false;
164 }
165 if let Some(thread) = &query.thread
166 && hit.thread.as_deref() != Some(thread.as_str())
167 {
168 return false;
169 }
170 if let Some(verbs) = &query.verbs
171 && !verbs.iter().any(|verb| verb == &hit.verb)
172 {
173 return false;
174 }
175 let timestamp = hit.timestamp();
176 if let Some(start) = query.since
177 && timestamp < start
178 {
179 return false;
180 }
181 if let Some(end) = query.until
182 && timestamp > end
183 {
184 return false;
185 }
186 true
187}
188
189fn hit_to_report(hit: IndexedOperation) -> QueryHit {
190 QueryHit {
191 seq: hit.seq,
192 timestamp_secs: hit.timestamp_secs,
193 verb: hit.verb,
194 actor_email: hit.actor_email,
195 operation_id: hit.operation_id.map(operation_id_to_string),
196 thread: hit.thread,
197 symbols: hit.symbols,
198 signal_kinds: hit.signal_kinds,
199 change_id: hit.change_id.map(|id| id.to_string_full()),
200 }
201}
202
203fn operation_id_to_string(id: OperationId) -> String {
204 id.to_string()
205}
206
207fn thread_for(op: &OpRecord) -> Option<String> {
208 match op {
209 OpRecord::Snapshot { thread, .. } => thread.clone(),
210 OpRecord::ThreadCreate { name, .. } => Some(name.clone()),
211 OpRecord::ThreadDelete { name, .. } => Some(name.clone()),
212 OpRecord::ThreadUpdate { name, .. } => Some(name.clone()),
213 OpRecord::MarkerCreate { name, .. } => Some(name.clone()),
214 OpRecord::MarkerDelete { name, .. } => Some(name.clone()),
215 OpRecord::Checkpoint { thread, .. } => thread.clone(),
216 OpRecord::EphemeralThreadCollapse { thread, .. } => Some(thread.clone()),
217 OpRecord::FastForward { target_thread, .. } => Some(target_thread.clone()),
218 OpRecord::GitCheckpoint { branch, .. } => Some(branch.clone()),
219 OpRecord::RemoteThreadUpdate { thread, .. }
220 | OpRecord::RemoteThreadDelete { thread, .. } => Some(thread.clone()),
221 OpRecord::Goto { .. }
222 | OpRecord::Fork { .. }
223 | OpRecord::Collapse { .. }
224 | OpRecord::TransactionAbort { .. }
225 | OpRecord::TransactionCommit { .. }
226 | OpRecord::ConflictResolved { .. }
227 | OpRecord::Redact { .. }
228 | OpRecord::UndoRecoveryUpdate { .. }
229 | OpRecord::StateVisibilitySet { .. }
230 | OpRecord::StateVisibilityPromote { .. }
231 | OpRecord::Purge { .. } => None,
232 }
233}
234
235fn primary_change_id(op: &OpRecord) -> Option<ChangeId> {
236 match op {
237 OpRecord::Snapshot { new_state, .. } => Some(*new_state),
238 OpRecord::Goto { target, .. } => Some(*target),
239 OpRecord::ThreadCreate { state, .. } => Some(*state),
240 OpRecord::ThreadDelete { state, .. } => Some(*state),
241 OpRecord::ThreadUpdate { new_state, .. } => Some(*new_state),
242 OpRecord::Fork { new_state, .. } => Some(*new_state),
243 OpRecord::Collapse { result, .. } => Some(*result),
244 OpRecord::MarkerCreate { state, .. } => Some(*state),
245 OpRecord::MarkerDelete { state, .. } => Some(*state),
246 OpRecord::Checkpoint { state, .. } => Some(*state),
247 OpRecord::GitCheckpoint { state, .. } => Some(*state),
248 OpRecord::EphemeralThreadCollapse { final_state, .. } => Some(*final_state),
249 OpRecord::Redact { state, .. } => Some(*state),
250 OpRecord::StateVisibilitySet { state, .. }
251 | OpRecord::StateVisibilityPromote { state, .. } => Some(*state),
252 OpRecord::RemoteThreadUpdate { state, .. } | OpRecord::RemoteThreadDelete { state, .. } => {
253 Some(*state)
254 }
255 OpRecord::UndoRecoveryUpdate { state } => Some(*state),
256 OpRecord::TransactionAbort { .. }
257 | OpRecord::TransactionCommit { .. }
258 | OpRecord::ConflictResolved { .. }
259 | OpRecord::Purge { .. }
260 | OpRecord::FastForward { .. } => None,
261 }
262}