1use crate::proto;
31use crate::proto::prost::Message as _;
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum HistoryPropagationScope {
42 OwnHistory,
46
47 Lineage,
51}
52
53impl HistoryPropagationScope {
54 pub(crate) fn to_proto(self) -> proto::HistoryPropagationScope {
56 match self {
57 Self::OwnHistory => proto::HistoryPropagationScope::OwnHistory,
58 Self::Lineage => proto::HistoryPropagationScope::Lineage,
59 }
60 }
61
62 pub(crate) fn from_proto(scope: proto::HistoryPropagationScope) -> Option<Self> {
65 match scope {
66 proto::HistoryPropagationScope::OwnHistory => Some(Self::OwnHistory),
67 proto::HistoryPropagationScope::Lineage => Some(Self::Lineage),
68 proto::HistoryPropagationScope::None => None,
69 }
70 }
71}
72
73#[derive(Clone, Debug)]
79pub struct PropagatedHistoryChunk {
80 pub app_id: String,
82 pub instance_id: String,
84 pub workflow_name: String,
86 pub start_event_index: i32,
89 pub event_count: i32,
91 pub events: Vec<proto::HistoryEvent>,
93}
94
95#[derive(Clone, Debug)]
100pub struct PropagatedHistory {
101 pub scope: HistoryPropagationScope,
103 pub events: Vec<proto::HistoryEvent>,
105 pub chunks: Vec<PropagatedHistoryChunk>,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
111#[error("propagated history: {kind} '{name}' not found")]
112pub struct PropagationNotFoundError {
113 pub kind: &'static str,
115 pub name: String,
117}
118
119impl PropagatedHistory {
120 pub fn from_proto(p: proto::PropagatedHistory) -> Option<Self> {
128 let scope = HistoryPropagationScope::from_proto(
129 proto::HistoryPropagationScope::try_from(p.scope).ok()?,
130 )?;
131
132 let mut all_events = Vec::new();
133 let mut chunks = Vec::with_capacity(p.chunks.len());
134
135 for raw in p.chunks {
136 let start_event_index = all_events.len() as i32;
137 let mut decoded = Vec::with_capacity(raw.raw_events.len());
138 for ev_bytes in &raw.raw_events {
139 if let Ok(ev) = proto::HistoryEvent::decode(ev_bytes.as_slice()) {
140 decoded.push(ev);
141 }
142 }
143 let event_count = raw.raw_events.len() as i32;
144 all_events.extend(decoded.iter().cloned());
145 chunks.push(PropagatedHistoryChunk {
146 app_id: raw.app_id,
147 instance_id: raw.instance_id,
148 workflow_name: raw.workflow_name,
149 start_event_index,
150 event_count,
151 events: decoded,
152 });
153 }
154
155 Some(Self {
156 scope,
157 events: all_events,
158 chunks,
159 })
160 }
161
162 pub fn app_ids(&self) -> Vec<String> {
165 let mut seen = std::collections::HashSet::new();
166 let mut out = Vec::new();
167 for c in &self.chunks {
168 if seen.insert(c.app_id.clone()) {
169 out.push(c.app_id.clone());
170 }
171 }
172 out
173 }
174
175 pub fn workflow_by_name(
180 &self,
181 name: &str,
182 ) -> Result<&PropagatedHistoryChunk, PropagationNotFoundError> {
183 self.chunks
184 .iter()
185 .find(|c| c.workflow_name == name)
186 .ok_or_else(|| PropagationNotFoundError {
187 kind: "workflow",
188 name: name.to_string(),
189 })
190 }
191
192 pub fn events_by_app_id(
194 &self,
195 app_id: &str,
196 ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
197 let mut out = Vec::new();
198 let mut found = false;
199 for c in &self.chunks {
200 if c.app_id == app_id {
201 found = true;
202 out.extend(c.events.iter().cloned());
203 }
204 }
205 if found {
206 Ok(out)
207 } else {
208 Err(PropagationNotFoundError {
209 kind: "app id",
210 name: app_id.to_string(),
211 })
212 }
213 }
214
215 pub fn events_by_instance_id(
217 &self,
218 instance_id: &str,
219 ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
220 let mut out = Vec::new();
221 let mut found = false;
222 for c in &self.chunks {
223 if c.instance_id == instance_id {
224 found = true;
225 out.extend(c.events.iter().cloned());
226 }
227 }
228 if found {
229 Ok(out)
230 } else {
231 Err(PropagationNotFoundError {
232 kind: "instance id",
233 name: instance_id.to_string(),
234 })
235 }
236 }
237
238 pub fn events_by_workflow_name(
241 &self,
242 name: &str,
243 ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
244 let mut out = Vec::new();
245 let mut found = false;
246 for c in &self.chunks {
247 if c.workflow_name == name {
248 found = true;
249 out.extend(c.events.iter().cloned());
250 }
251 }
252 if found {
253 Ok(out)
254 } else {
255 Err(PropagationNotFoundError {
256 kind: "workflow",
257 name: name.to_string(),
258 })
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::proto::prost::Message;
267
268 fn ev(id: i32) -> proto::HistoryEvent {
269 proto::HistoryEvent {
270 event_id: id,
271 timestamp: None,
272 router: None,
273 event_type: None,
274 }
275 }
276
277 fn raw_chunk(app: &str, inst: &str, wf: &str, n: i32) -> proto::PropagatedHistoryChunk {
278 let raw_events = (0..n).map(|i| ev(i).encode_to_vec()).collect();
279 proto::PropagatedHistoryChunk {
280 raw_events,
281 app_id: app.to_string(),
282 instance_id: inst.to_string(),
283 workflow_name: wf.to_string(),
284 raw_signatures: vec![],
285 signing_cert_chains: vec![],
286 }
287 }
288
289 #[test]
290 fn from_proto_none_returns_none() {
291 let p = proto::PropagatedHistory {
292 scope: proto::HistoryPropagationScope::None as i32,
293 chunks: vec![],
294 };
295 assert!(PropagatedHistory::from_proto(p).is_none());
296 }
297
298 #[test]
299 fn from_proto_decodes_chunks_and_flattens_events() {
300 let p = proto::PropagatedHistory {
301 scope: proto::HistoryPropagationScope::Lineage as i32,
302 chunks: vec![
303 raw_chunk("app-a", "inst-a", "WfA", 2),
304 raw_chunk("app-b", "inst-b", "WfB", 3),
305 ],
306 };
307 let h = PropagatedHistory::from_proto(p).expect("scope set");
308 assert_eq!(h.scope, HistoryPropagationScope::Lineage);
309 assert_eq!(h.events.len(), 5);
310 assert_eq!(h.chunks.len(), 2);
311 assert_eq!(h.chunks[0].start_event_index, 0);
312 assert_eq!(h.chunks[0].event_count, 2);
313 assert_eq!(h.chunks[1].start_event_index, 2);
314 assert_eq!(h.chunks[1].event_count, 3);
315 }
316
317 #[test]
318 fn app_ids_are_deduplicated_in_chain_order() {
319 let p = proto::PropagatedHistory {
320 scope: proto::HistoryPropagationScope::Lineage as i32,
321 chunks: vec![
322 raw_chunk("app-a", "i1", "Wf1", 1),
323 raw_chunk("app-b", "i2", "Wf2", 1),
324 raw_chunk("app-a", "i3", "Wf3", 1),
325 ],
326 };
327 let h = PropagatedHistory::from_proto(p).unwrap();
328 assert_eq!(h.app_ids(), vec!["app-a".to_string(), "app-b".to_string()]);
329 }
330
331 #[test]
332 fn filters_return_not_found_for_missing_names() {
333 let p = proto::PropagatedHistory {
334 scope: proto::HistoryPropagationScope::OwnHistory as i32,
335 chunks: vec![raw_chunk("app-a", "inst", "WfA", 1)],
336 };
337 let h = PropagatedHistory::from_proto(p).unwrap();
338 assert!(h.workflow_by_name("missing").is_err());
339 assert!(h.events_by_app_id("missing").is_err());
340 assert!(h.events_by_instance_id("missing").is_err());
341 assert!(h.events_by_workflow_name("missing").is_err());
342 }
343
344 #[test]
345 fn filters_return_matching_events() {
346 let p = proto::PropagatedHistory {
347 scope: proto::HistoryPropagationScope::Lineage as i32,
348 chunks: vec![
349 raw_chunk("app-a", "inst-a", "WfA", 2),
350 raw_chunk("app-b", "inst-b", "WfB", 3),
351 ],
352 };
353 let h = PropagatedHistory::from_proto(p).unwrap();
354 assert_eq!(h.events_by_app_id("app-a").unwrap().len(), 2);
355 assert_eq!(h.events_by_instance_id("inst-b").unwrap().len(), 3);
356 assert_eq!(h.events_by_workflow_name("WfA").unwrap().len(), 2);
357 assert_eq!(h.workflow_by_name("WfB").unwrap().instance_id, "inst-b");
358 }
359}