Skip to main content

dapr_durabletask/api/
history_propagation.rs

1//! Workflow history propagation: scope enum, propagated history struct, and
2//! convenience filters.
3//!
4//! Dapr Workflows can propagate execution history from a parent workflow to
5//! its child workflows and activities. Two scopes are supported:
6//!
7//! * [`HistoryPropagationScope::OwnHistory`] — only the caller's events are
8//!   forwarded; ancestral history is dropped (a trust boundary).
9//! * [`HistoryPropagationScope::Lineage`] — the caller's events plus the full
10//!   ancestor chain are forwarded.
11//!
12//! Parent (schedule-side):
13//! ```ignore
14//! ctx.call_activity_with_options(
15//!     "verify",
16//!     input,
17//!     ActivityOptions::new().with_history_propagation(HistoryPropagationScope::Lineage),
18//! );
19//! ```
20//!
21//! Child / activity (receive-side):
22//! ```ignore
23//! if let Some(history) = ctx.propagated_history() {
24//!     for app in history.app_ids() {
25//!         println!("ancestor app: {app}");
26//!     }
27//! }
28//! ```
29
30use crate::proto;
31use crate::proto::prost::Message as _;
32
33/// Controls how history flows from a calling workflow into a scheduled
34/// activity or child workflow.
35///
36/// Mirrors the proto `HistoryPropagationScope` enum without exposing the raw
37/// `i32` discriminants. The default `None` scope is intentionally not part of
38/// the public surface — callers either pass an explicit scope or omit the
39/// option entirely (which is equivalent to "no propagation").
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum HistoryPropagationScope {
42    /// Forward only the caller's own history events.
43    /// Ancestral history (anything the caller itself received from its
44    /// parent) is dropped at this trust boundary.
45    OwnHistory,
46
47    /// Forward the caller's own history events and the full ancestor chain.
48    /// Any propagated history the caller received is forwarded as additional
49    /// chunks alongside the caller's own.
50    Lineage,
51}
52
53impl HistoryPropagationScope {
54    /// Convert to the wire-format proto enum value.
55    pub(crate) fn to_proto(self) -> proto::HistoryPropagationScope {
56        match self {
57            Self::OwnHistory => proto::HistoryPropagationScope::OwnHistory,
58            Self::Lineage => proto::HistoryPropagationScope::Lineage,
59        }
60    }
61
62    /// Convert from a proto enum value, returning `None` for `SCOPE_NONE` or
63    /// unknown variants.
64    pub(crate) fn from_proto(scope: proto::HistoryPropagationScope) -> Option<Self> {
65        match scope {
66            proto::HistoryPropagationScope::OwnHistory => Some(Self::OwnHistory),
67            proto::HistoryPropagationScope::Lineage => Some(Self::Lineage),
68            proto::HistoryPropagationScope::None => None,
69        }
70    }
71}
72
73/// A single per-app slice of propagated history.
74///
75/// One chunk corresponds to all events produced by a single workflow
76/// instance running on a single Dapr app. When `Lineage` is used, multiple
77/// chunks describe the full ancestor chain in execution order.
78#[derive(Clone, Debug)]
79pub struct PropagatedHistoryChunk {
80    /// The Dapr app ID that produced these events.
81    pub app_id: String,
82    /// The workflow instance ID that produced these events.
83    pub instance_id: String,
84    /// The workflow function name that produced these events.
85    pub workflow_name: String,
86    /// Index of the first event in this chunk relative to the propagated
87    /// stream as a whole.
88    pub start_event_index: i32,
89    /// Number of events in this chunk.
90    pub event_count: i32,
91    /// Decoded history events for this chunk, in execution order.
92    pub events: Vec<proto::HistoryEvent>,
93}
94
95/// Propagated execution history delivered to a child workflow or activity.
96///
97/// Use [`PropagatedHistory::events`] for the flat event stream, or any of the
98/// `events_by_*` / `workflow_by_name` filters to slice it by chunk metadata.
99#[derive(Clone, Debug)]
100pub struct PropagatedHistory {
101    /// The propagation scope the parent used when scheduling this work item.
102    pub scope: HistoryPropagationScope,
103    /// All propagated events flattened in execution order.
104    pub events: Vec<proto::HistoryEvent>,
105    /// Per-app/per-instance chunk metadata.
106    pub chunks: Vec<PropagatedHistoryChunk>,
107}
108
109/// Returned when a `*_by_name` filter cannot find a matching workflow or app.
110#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
111#[error("propagated history: {kind} '{name}' not found")]
112pub struct PropagationNotFoundError {
113    /// Human-readable kind of the missing entity ("workflow", "app id", etc.).
114    pub kind: &'static str,
115    /// The name that was searched for.
116    pub name: String,
117}
118
119impl PropagatedHistory {
120    /// Build a `PropagatedHistory` from the wire-format proto.
121    ///
122    /// Returns `None` if the proto's scope is `SCOPE_NONE` (no propagation
123    /// happened) or unknown. Each chunk's raw bytes are decoded once into
124    /// typed [`HistoryEvent`](proto::HistoryEvent)s; malformed events are
125    /// silently dropped (the chunk's `event_count` reflects the original wire
126    /// count so the caller can detect drift).
127    pub fn from_proto(p: proto::PropagatedHistory) -> Option<Self> {
128        let scope = HistoryPropagationScope::from_proto(
129            proto::HistoryPropagationScope::try_from(p.scope).ok()?,
130        )?;
131
132        let mut all_events = Vec::new();
133        let mut chunks = Vec::with_capacity(p.chunks.len());
134
135        for raw in p.chunks {
136            let start_event_index = all_events.len() as i32;
137            let mut decoded = Vec::with_capacity(raw.raw_events.len());
138            for ev_bytes in &raw.raw_events {
139                if let Ok(ev) = proto::HistoryEvent::decode(ev_bytes.as_slice()) {
140                    decoded.push(ev);
141                }
142            }
143            let event_count = raw.raw_events.len() as i32;
144            all_events.extend(decoded.iter().cloned());
145            chunks.push(PropagatedHistoryChunk {
146                app_id: raw.app_id,
147                instance_id: raw.instance_id,
148                workflow_name: raw.workflow_name,
149                start_event_index,
150                event_count,
151                events: decoded,
152            });
153        }
154
155        Some(Self {
156            scope,
157            events: all_events,
158            chunks,
159        })
160    }
161
162    /// Deduplicated list of app IDs in the propagated chain, in chunk order
163    /// (earliest ancestor first).
164    pub fn app_ids(&self) -> Vec<String> {
165        let mut seen = std::collections::HashSet::new();
166        let mut out = Vec::new();
167        for c in &self.chunks {
168            if seen.insert(c.app_id.clone()) {
169                out.push(c.app_id.clone());
170            }
171        }
172        out
173    }
174
175    /// Return the chunk produced by a workflow with the given function name.
176    ///
177    /// If multiple chunks share the same name (re-entrant ancestor calls) the
178    /// first match in chain order is returned.
179    pub fn workflow_by_name(
180        &self,
181        name: &str,
182    ) -> Result<&PropagatedHistoryChunk, PropagationNotFoundError> {
183        self.chunks
184            .iter()
185            .find(|c| c.workflow_name == name)
186            .ok_or_else(|| PropagationNotFoundError {
187                kind: "workflow",
188                name: name.to_string(),
189            })
190    }
191
192    /// All events from chunks tagged with the given Dapr app ID.
193    pub fn events_by_app_id(
194        &self,
195        app_id: &str,
196    ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
197        let mut out = Vec::new();
198        let mut found = false;
199        for c in &self.chunks {
200            if c.app_id == app_id {
201                found = true;
202                out.extend(c.events.iter().cloned());
203            }
204        }
205        if found {
206            Ok(out)
207        } else {
208            Err(PropagationNotFoundError {
209                kind: "app id",
210                name: app_id.to_string(),
211            })
212        }
213    }
214
215    /// All events from the chunk with the given instance ID.
216    pub fn events_by_instance_id(
217        &self,
218        instance_id: &str,
219    ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
220        let mut out = Vec::new();
221        let mut found = false;
222        for c in &self.chunks {
223            if c.instance_id == instance_id {
224                found = true;
225                out.extend(c.events.iter().cloned());
226            }
227        }
228        if found {
229            Ok(out)
230        } else {
231            Err(PropagationNotFoundError {
232                kind: "instance id",
233                name: instance_id.to_string(),
234            })
235        }
236    }
237
238    /// All events from chunks produced by a workflow with the given function
239    /// name.
240    pub fn events_by_workflow_name(
241        &self,
242        name: &str,
243    ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
244        let mut out = Vec::new();
245        let mut found = false;
246        for c in &self.chunks {
247            if c.workflow_name == name {
248                found = true;
249                out.extend(c.events.iter().cloned());
250            }
251        }
252        if found {
253            Ok(out)
254        } else {
255            Err(PropagationNotFoundError {
256                kind: "workflow",
257                name: name.to_string(),
258            })
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::proto::prost::Message;
267
268    fn ev(id: i32) -> proto::HistoryEvent {
269        proto::HistoryEvent {
270            event_id: id,
271            timestamp: None,
272            router: None,
273            event_type: None,
274        }
275    }
276
277    fn raw_chunk(app: &str, inst: &str, wf: &str, n: i32) -> proto::PropagatedHistoryChunk {
278        let raw_events = (0..n).map(|i| ev(i).encode_to_vec()).collect();
279        proto::PropagatedHistoryChunk {
280            raw_events,
281            app_id: app.to_string(),
282            instance_id: inst.to_string(),
283            workflow_name: wf.to_string(),
284            raw_signatures: vec![],
285            signing_cert_chains: vec![],
286        }
287    }
288
289    #[test]
290    fn from_proto_none_returns_none() {
291        let p = proto::PropagatedHistory {
292            scope: proto::HistoryPropagationScope::None as i32,
293            chunks: vec![],
294        };
295        assert!(PropagatedHistory::from_proto(p).is_none());
296    }
297
298    #[test]
299    fn from_proto_decodes_chunks_and_flattens_events() {
300        let p = proto::PropagatedHistory {
301            scope: proto::HistoryPropagationScope::Lineage as i32,
302            chunks: vec![
303                raw_chunk("app-a", "inst-a", "WfA", 2),
304                raw_chunk("app-b", "inst-b", "WfB", 3),
305            ],
306        };
307        let h = PropagatedHistory::from_proto(p).expect("scope set");
308        assert_eq!(h.scope, HistoryPropagationScope::Lineage);
309        assert_eq!(h.events.len(), 5);
310        assert_eq!(h.chunks.len(), 2);
311        assert_eq!(h.chunks[0].start_event_index, 0);
312        assert_eq!(h.chunks[0].event_count, 2);
313        assert_eq!(h.chunks[1].start_event_index, 2);
314        assert_eq!(h.chunks[1].event_count, 3);
315    }
316
317    #[test]
318    fn app_ids_are_deduplicated_in_chain_order() {
319        let p = proto::PropagatedHistory {
320            scope: proto::HistoryPropagationScope::Lineage as i32,
321            chunks: vec![
322                raw_chunk("app-a", "i1", "Wf1", 1),
323                raw_chunk("app-b", "i2", "Wf2", 1),
324                raw_chunk("app-a", "i3", "Wf3", 1),
325            ],
326        };
327        let h = PropagatedHistory::from_proto(p).unwrap();
328        assert_eq!(h.app_ids(), vec!["app-a".to_string(), "app-b".to_string()]);
329    }
330
331    #[test]
332    fn filters_return_not_found_for_missing_names() {
333        let p = proto::PropagatedHistory {
334            scope: proto::HistoryPropagationScope::OwnHistory as i32,
335            chunks: vec![raw_chunk("app-a", "inst", "WfA", 1)],
336        };
337        let h = PropagatedHistory::from_proto(p).unwrap();
338        assert!(h.workflow_by_name("missing").is_err());
339        assert!(h.events_by_app_id("missing").is_err());
340        assert!(h.events_by_instance_id("missing").is_err());
341        assert!(h.events_by_workflow_name("missing").is_err());
342    }
343
344    #[test]
345    fn filters_return_matching_events() {
346        let p = proto::PropagatedHistory {
347            scope: proto::HistoryPropagationScope::Lineage as i32,
348            chunks: vec![
349                raw_chunk("app-a", "inst-a", "WfA", 2),
350                raw_chunk("app-b", "inst-b", "WfB", 3),
351            ],
352        };
353        let h = PropagatedHistory::from_proto(p).unwrap();
354        assert_eq!(h.events_by_app_id("app-a").unwrap().len(), 2);
355        assert_eq!(h.events_by_instance_id("inst-b").unwrap().len(), 3);
356        assert_eq!(h.events_by_workflow_name("WfA").unwrap().len(), 2);
357        assert_eq!(h.workflow_by_name("WfB").unwrap().instance_id, "inst-b");
358    }
359}