1use crate::proto;
31use crate::proto::prost::Message as _;
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum HistoryPropagationScope {
42 OwnHistory,
46
47 Lineage,
51}
52
53impl From<HistoryPropagationScope> for proto::HistoryPropagationScope {
54 fn from(scope: HistoryPropagationScope) -> Self {
55 match scope {
56 HistoryPropagationScope::OwnHistory => proto::HistoryPropagationScope::OwnHistory,
57 HistoryPropagationScope::Lineage => proto::HistoryPropagationScope::Lineage,
58 }
59 }
60}
61
62impl TryFrom<proto::HistoryPropagationScope> for HistoryPropagationScope {
63 type Error = ();
64
65 fn try_from(scope: proto::HistoryPropagationScope) -> std::result::Result<Self, Self::Error> {
66 match scope {
67 proto::HistoryPropagationScope::OwnHistory => Ok(Self::OwnHistory),
68 proto::HistoryPropagationScope::Lineage => Ok(Self::Lineage),
69 proto::HistoryPropagationScope::None => Err(()),
70 }
71 }
72}
73
74impl HistoryPropagationScope {
75 pub(crate) fn to_proto(self) -> proto::HistoryPropagationScope {
77 self.into()
78 }
79
80 pub(crate) fn from_proto(scope: proto::HistoryPropagationScope) -> Option<Self> {
83 Self::try_from(scope).ok()
84 }
85}
86
87#[derive(Clone, Debug)]
93pub struct PropagatedHistoryChunk {
94 pub app_id: String,
96 pub instance_id: String,
98 pub workflow_name: String,
100 pub start_event_index: i32,
103 pub event_count: i32,
105 pub events: Vec<proto::HistoryEvent>,
107}
108
109#[derive(Clone, Debug)]
114pub struct PropagatedHistory {
115 pub scope: HistoryPropagationScope,
117 pub events: Vec<proto::HistoryEvent>,
119 pub chunks: Vec<PropagatedHistoryChunk>,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
125#[error("propagated history: {kind} '{name}' not found")]
126pub struct PropagationNotFoundError {
127 pub kind: &'static str,
129 pub name: String,
131}
132
133impl PropagatedHistory {
134 pub fn from_proto(p: proto::PropagatedHistory) -> Option<Self> {
142 let scope = HistoryPropagationScope::from_proto(
143 proto::HistoryPropagationScope::try_from(p.scope).ok()?,
144 )?;
145
146 let mut all_events = Vec::new();
147 let mut chunks = Vec::with_capacity(p.chunks.len());
148
149 for raw in p.chunks {
150 let start_event_index = all_events.len() as i32;
151 let mut decoded = Vec::with_capacity(raw.raw_events.len());
152 for ev_bytes in &raw.raw_events {
153 if let Ok(ev) = proto::HistoryEvent::decode(ev_bytes.as_slice()) {
154 decoded.push(ev);
155 }
156 }
157 let event_count = raw.raw_events.len() as i32;
158 all_events.extend_from_slice(&decoded);
162 chunks.push(PropagatedHistoryChunk {
163 app_id: raw.app_id,
164 instance_id: raw.instance_id,
165 workflow_name: raw.workflow_name,
166 start_event_index,
167 event_count,
168 events: decoded,
169 });
170 }
171
172 Some(Self {
173 scope,
174 events: all_events,
175 chunks,
176 })
177 }
178
179 pub fn app_ids(&self) -> Vec<String> {
182 let mut seen = std::collections::HashSet::new();
183 self.chunks
184 .iter()
185 .filter(|c| seen.insert(c.app_id.as_str()))
186 .map(|c| c.app_id.clone())
187 .collect()
188 }
189
190 pub fn workflow_by_name(
195 &self,
196 name: &str,
197 ) -> Result<&PropagatedHistoryChunk, PropagationNotFoundError> {
198 self.chunks
199 .iter()
200 .find(|c| c.workflow_name == name)
201 .ok_or_else(|| PropagationNotFoundError {
202 kind: "workflow",
203 name: name.to_string(),
204 })
205 }
206
207 pub fn events_by_app_id(
209 &self,
210 app_id: &str,
211 ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
212 let mut out = Vec::new();
213 let mut found = false;
214 for c in &self.chunks {
215 if c.app_id == app_id {
216 found = true;
217 out.extend(c.events.iter().cloned());
218 }
219 }
220 if found {
221 Ok(out)
222 } else {
223 Err(PropagationNotFoundError {
224 kind: "app id",
225 name: app_id.to_string(),
226 })
227 }
228 }
229
230 pub fn events_by_instance_id(
232 &self,
233 instance_id: &str,
234 ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
235 let mut out = Vec::new();
236 let mut found = false;
237 for c in &self.chunks {
238 if c.instance_id == instance_id {
239 found = true;
240 out.extend(c.events.iter().cloned());
241 }
242 }
243 if found {
244 Ok(out)
245 } else {
246 Err(PropagationNotFoundError {
247 kind: "instance id",
248 name: instance_id.to_string(),
249 })
250 }
251 }
252
253 pub fn events_by_workflow_name(
256 &self,
257 name: &str,
258 ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
259 let mut out = Vec::new();
260 let mut found = false;
261 for c in &self.chunks {
262 if c.workflow_name == name {
263 found = true;
264 out.extend(c.events.iter().cloned());
265 }
266 }
267 if found {
268 Ok(out)
269 } else {
270 Err(PropagationNotFoundError {
271 kind: "workflow",
272 name: name.to_string(),
273 })
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::proto::prost::Message;
282
283 fn ev(id: i32) -> proto::HistoryEvent {
284 proto::HistoryEvent {
285 event_id: id,
286 timestamp: None,
287 router: None,
288 event_type: None,
289 }
290 }
291
292 fn raw_chunk(app: &str, inst: &str, wf: &str, n: i32) -> proto::PropagatedHistoryChunk {
293 let raw_events = (0..n).map(|i| ev(i).encode_to_vec()).collect();
294 proto::PropagatedHistoryChunk {
295 raw_events,
296 app_id: app.to_string(),
297 instance_id: inst.to_string(),
298 workflow_name: wf.to_string(),
299 raw_signatures: vec![],
300 signing_cert_chains: vec![],
301 }
302 }
303
304 #[test]
305 fn from_proto_none_returns_none() {
306 let p = proto::PropagatedHistory {
307 scope: proto::HistoryPropagationScope::None as i32,
308 chunks: vec![],
309 };
310 assert!(PropagatedHistory::from_proto(p).is_none());
311 }
312
313 #[test]
314 fn from_proto_decodes_chunks_and_flattens_events() {
315 let p = proto::PropagatedHistory {
316 scope: proto::HistoryPropagationScope::Lineage as i32,
317 chunks: vec![
318 raw_chunk("app-a", "inst-a", "WfA", 2),
319 raw_chunk("app-b", "inst-b", "WfB", 3),
320 ],
321 };
322 let h = PropagatedHistory::from_proto(p).expect("scope set");
323 assert_eq!(h.scope, HistoryPropagationScope::Lineage);
324 assert_eq!(h.events.len(), 5);
325 assert_eq!(h.chunks.len(), 2);
326 assert_eq!(h.chunks[0].start_event_index, 0);
327 assert_eq!(h.chunks[0].event_count, 2);
328 assert_eq!(h.chunks[1].start_event_index, 2);
329 assert_eq!(h.chunks[1].event_count, 3);
330 }
331
332 #[test]
333 fn app_ids_are_deduplicated_in_chain_order() {
334 let p = proto::PropagatedHistory {
335 scope: proto::HistoryPropagationScope::Lineage as i32,
336 chunks: vec![
337 raw_chunk("app-a", "i1", "Wf1", 1),
338 raw_chunk("app-b", "i2", "Wf2", 1),
339 raw_chunk("app-a", "i3", "Wf3", 1),
340 ],
341 };
342 let h = PropagatedHistory::from_proto(p).unwrap();
343 assert_eq!(h.app_ids(), vec!["app-a".to_string(), "app-b".to_string()]);
344 }
345
346 #[test]
347 fn filters_return_not_found_for_missing_names() {
348 let p = proto::PropagatedHistory {
349 scope: proto::HistoryPropagationScope::OwnHistory as i32,
350 chunks: vec![raw_chunk("app-a", "inst", "WfA", 1)],
351 };
352 let h = PropagatedHistory::from_proto(p).unwrap();
353 assert!(h.workflow_by_name("missing").is_err());
354 assert!(h.events_by_app_id("missing").is_err());
355 assert!(h.events_by_instance_id("missing").is_err());
356 assert!(h.events_by_workflow_name("missing").is_err());
357 }
358
359 #[test]
360 fn filters_return_matching_events() {
361 let p = proto::PropagatedHistory {
362 scope: proto::HistoryPropagationScope::Lineage as i32,
363 chunks: vec![
364 raw_chunk("app-a", "inst-a", "WfA", 2),
365 raw_chunk("app-b", "inst-b", "WfB", 3),
366 ],
367 };
368 let h = PropagatedHistory::from_proto(p).unwrap();
369 assert_eq!(h.events_by_app_id("app-a").unwrap().len(), 2);
370 assert_eq!(h.events_by_instance_id("inst-b").unwrap().len(), 3);
371 assert_eq!(h.events_by_workflow_name("WfA").unwrap().len(), 2);
372 assert_eq!(h.workflow_by_name("WfB").unwrap().instance_id, "inst-b");
373 }
374}