Skip to main content

dapr_durabletask/api/
history_propagation.rs

1//! Workflow history propagation: scope enum, propagated history struct, and
2//! convenience filters.
3//!
4//! Dapr Workflows can propagate execution history from a parent workflow to
5//! its child workflows and activities. Two scopes are supported:
6//!
7//! * [`HistoryPropagationScope::OwnHistory`] — only the caller's events are
8//!   forwarded; ancestral history is dropped (a trust boundary).
9//! * [`HistoryPropagationScope::Lineage`] — the caller's events plus the full
10//!   ancestor chain are forwarded.
11//!
12//! Parent (schedule-side):
13//! ```ignore
14//! ctx.call_activity_with_options(
15//!     "verify",
16//!     input,
17//!     ActivityOptions::new().with_history_propagation(HistoryPropagationScope::Lineage),
18//! );
19//! ```
20//!
21//! Child / activity (receive-side):
22//! ```ignore
23//! if let Some(history) = ctx.propagated_history() {
24//!     for app in history.app_ids() {
25//!         println!("ancestor app: {app}");
26//!     }
27//! }
28//! ```
29
30use crate::proto;
31use crate::proto::prost::Message as _;
32
33/// Controls how history flows from a calling workflow into a scheduled
34/// activity or child workflow.
35///
36/// Mirrors the proto `HistoryPropagationScope` enum without exposing the raw
37/// `i32` discriminants. The default `None` scope is intentionally not part of
38/// the public surface — callers either pass an explicit scope or omit the
39/// option entirely (which is equivalent to "no propagation").
40#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
41pub enum HistoryPropagationScope {
42    /// Forward only the caller's own history events.
43    /// Ancestral history (anything the caller itself received from its
44    /// parent) is dropped at this trust boundary.
45    OwnHistory,
46
47    /// Forward the caller's own history events and the full ancestor chain.
48    /// Any propagated history the caller received is forwarded as additional
49    /// chunks alongside the caller's own.
50    Lineage,
51}
52
53impl From<HistoryPropagationScope> for proto::HistoryPropagationScope {
54    fn from(scope: HistoryPropagationScope) -> Self {
55        match scope {
56            HistoryPropagationScope::OwnHistory => proto::HistoryPropagationScope::OwnHistory,
57            HistoryPropagationScope::Lineage => proto::HistoryPropagationScope::Lineage,
58        }
59    }
60}
61
62impl TryFrom<proto::HistoryPropagationScope> for HistoryPropagationScope {
63    type Error = ();
64
65    fn try_from(scope: proto::HistoryPropagationScope) -> std::result::Result<Self, Self::Error> {
66        match scope {
67            proto::HistoryPropagationScope::OwnHistory => Ok(Self::OwnHistory),
68            proto::HistoryPropagationScope::Lineage => Ok(Self::Lineage),
69            proto::HistoryPropagationScope::None => Err(()),
70        }
71    }
72}
73
74impl HistoryPropagationScope {
75    /// Convert to the wire-format proto enum value.
76    pub(crate) fn to_proto(self) -> proto::HistoryPropagationScope {
77        self.into()
78    }
79
80    /// Convert from a proto enum value, returning `None` for `SCOPE_NONE` or
81    /// unknown variants.
82    pub(crate) fn from_proto(scope: proto::HistoryPropagationScope) -> Option<Self> {
83        Self::try_from(scope).ok()
84    }
85}
86
87/// A single per-app slice of propagated history.
88///
89/// One chunk corresponds to all events produced by a single workflow
90/// instance running on a single Dapr app. When `Lineage` is used, multiple
91/// chunks describe the full ancestor chain in execution order.
92#[derive(Clone, Debug)]
93pub struct PropagatedHistoryChunk {
94    /// The Dapr app ID that produced these events.
95    pub app_id: String,
96    /// The workflow instance ID that produced these events.
97    pub instance_id: String,
98    /// The workflow function name that produced these events.
99    pub workflow_name: String,
100    /// Index of the first event in this chunk relative to the propagated
101    /// stream as a whole.
102    pub start_event_index: i32,
103    /// Number of events in this chunk.
104    pub event_count: i32,
105    /// Decoded history events for this chunk, in execution order.
106    pub events: Vec<proto::HistoryEvent>,
107}
108
109/// Propagated execution history delivered to a child workflow or activity.
110///
111/// Use [`PropagatedHistory::events`] for the flat event stream, or any of the
112/// `events_by_*` / `workflow_by_name` filters to slice it by chunk metadata.
113#[derive(Clone, Debug)]
114pub struct PropagatedHistory {
115    /// The propagation scope the parent used when scheduling this work item.
116    pub scope: HistoryPropagationScope,
117    /// All propagated events flattened in execution order.
118    pub events: Vec<proto::HistoryEvent>,
119    /// Per-app/per-instance chunk metadata.
120    pub chunks: Vec<PropagatedHistoryChunk>,
121}
122
123/// Returned when a `*_by_name` filter cannot find a matching workflow or app.
124#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
125#[error("propagated history: {kind} '{name}' not found")]
126pub struct PropagationNotFoundError {
127    /// Human-readable kind of the missing entity ("workflow", "app id", etc.).
128    pub kind: &'static str,
129    /// The name that was searched for.
130    pub name: String,
131}
132
133impl PropagatedHistory {
134    /// Build a `PropagatedHistory` from the wire-format proto.
135    ///
136    /// Returns `None` if the proto's scope is `SCOPE_NONE` (no propagation
137    /// happened) or unknown. Each chunk's raw bytes are decoded once into
138    /// typed [`HistoryEvent`](proto::HistoryEvent)s; malformed events are
139    /// silently dropped (the chunk's `event_count` reflects the original wire
140    /// count so the caller can detect drift).
141    pub fn from_proto(p: proto::PropagatedHistory) -> Option<Self> {
142        let scope = HistoryPropagationScope::from_proto(
143            proto::HistoryPropagationScope::try_from(p.scope).ok()?,
144        )?;
145
146        let mut all_events = Vec::new();
147        let mut chunks = Vec::with_capacity(p.chunks.len());
148
149        for raw in p.chunks {
150            let start_event_index = all_events.len() as i32;
151            let mut decoded = Vec::with_capacity(raw.raw_events.len());
152            for ev_bytes in &raw.raw_events {
153                if let Ok(ev) = proto::HistoryEvent::decode(ev_bytes.as_slice()) {
154                    decoded.push(ev);
155                }
156            }
157            let event_count = raw.raw_events.len() as i32;
158            // Clone each decoded event into the flat stream, then move the
159            // owned Vec into the chunk — no double allocation of the Vec
160            // backing storage.
161            all_events.extend_from_slice(&decoded);
162            chunks.push(PropagatedHistoryChunk {
163                app_id: raw.app_id,
164                instance_id: raw.instance_id,
165                workflow_name: raw.workflow_name,
166                start_event_index,
167                event_count,
168                events: decoded,
169            });
170        }
171
172        Some(Self {
173            scope,
174            events: all_events,
175            chunks,
176        })
177    }
178
179    /// Deduplicated list of app IDs in the propagated chain, in chunk order
180    /// (earliest ancestor first).
181    pub fn app_ids(&self) -> Vec<String> {
182        let mut seen = std::collections::HashSet::new();
183        self.chunks
184            .iter()
185            .filter(|c| seen.insert(c.app_id.as_str()))
186            .map(|c| c.app_id.clone())
187            .collect()
188    }
189
190    /// Return the chunk produced by a workflow with the given function name.
191    ///
192    /// If multiple chunks share the same name (re-entrant ancestor calls) the
193    /// first match in chain order is returned.
194    pub fn workflow_by_name(
195        &self,
196        name: &str,
197    ) -> Result<&PropagatedHistoryChunk, PropagationNotFoundError> {
198        self.chunks
199            .iter()
200            .find(|c| c.workflow_name == name)
201            .ok_or_else(|| PropagationNotFoundError {
202                kind: "workflow",
203                name: name.to_string(),
204            })
205    }
206
207    /// All events from chunks tagged with the given Dapr app ID.
208    pub fn events_by_app_id(
209        &self,
210        app_id: &str,
211    ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
212        let mut out = Vec::new();
213        let mut found = false;
214        for c in &self.chunks {
215            if c.app_id == app_id {
216                found = true;
217                out.extend(c.events.iter().cloned());
218            }
219        }
220        if found {
221            Ok(out)
222        } else {
223            Err(PropagationNotFoundError {
224                kind: "app id",
225                name: app_id.to_string(),
226            })
227        }
228    }
229
230    /// All events from the chunk with the given instance ID.
231    pub fn events_by_instance_id(
232        &self,
233        instance_id: &str,
234    ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
235        let mut out = Vec::new();
236        let mut found = false;
237        for c in &self.chunks {
238            if c.instance_id == instance_id {
239                found = true;
240                out.extend(c.events.iter().cloned());
241            }
242        }
243        if found {
244            Ok(out)
245        } else {
246            Err(PropagationNotFoundError {
247                kind: "instance id",
248                name: instance_id.to_string(),
249            })
250        }
251    }
252
253    /// All events from chunks produced by a workflow with the given function
254    /// name.
255    pub fn events_by_workflow_name(
256        &self,
257        name: &str,
258    ) -> Result<Vec<proto::HistoryEvent>, PropagationNotFoundError> {
259        let mut out = Vec::new();
260        let mut found = false;
261        for c in &self.chunks {
262            if c.workflow_name == name {
263                found = true;
264                out.extend(c.events.iter().cloned());
265            }
266        }
267        if found {
268            Ok(out)
269        } else {
270            Err(PropagationNotFoundError {
271                kind: "workflow",
272                name: name.to_string(),
273            })
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use crate::proto::prost::Message;
282
283    fn ev(id: i32) -> proto::HistoryEvent {
284        proto::HistoryEvent {
285            event_id: id,
286            timestamp: None,
287            router: None,
288            event_type: None,
289        }
290    }
291
292    fn raw_chunk(app: &str, inst: &str, wf: &str, n: i32) -> proto::PropagatedHistoryChunk {
293        let raw_events = (0..n).map(|i| ev(i).encode_to_vec()).collect();
294        proto::PropagatedHistoryChunk {
295            raw_events,
296            app_id: app.to_string(),
297            instance_id: inst.to_string(),
298            workflow_name: wf.to_string(),
299            raw_signatures: vec![],
300            signing_cert_chains: vec![],
301        }
302    }
303
304    #[test]
305    fn from_proto_none_returns_none() {
306        let p = proto::PropagatedHistory {
307            scope: proto::HistoryPropagationScope::None as i32,
308            chunks: vec![],
309        };
310        assert!(PropagatedHistory::from_proto(p).is_none());
311    }
312
313    #[test]
314    fn from_proto_decodes_chunks_and_flattens_events() {
315        let p = proto::PropagatedHistory {
316            scope: proto::HistoryPropagationScope::Lineage as i32,
317            chunks: vec![
318                raw_chunk("app-a", "inst-a", "WfA", 2),
319                raw_chunk("app-b", "inst-b", "WfB", 3),
320            ],
321        };
322        let h = PropagatedHistory::from_proto(p).expect("scope set");
323        assert_eq!(h.scope, HistoryPropagationScope::Lineage);
324        assert_eq!(h.events.len(), 5);
325        assert_eq!(h.chunks.len(), 2);
326        assert_eq!(h.chunks[0].start_event_index, 0);
327        assert_eq!(h.chunks[0].event_count, 2);
328        assert_eq!(h.chunks[1].start_event_index, 2);
329        assert_eq!(h.chunks[1].event_count, 3);
330    }
331
332    #[test]
333    fn app_ids_are_deduplicated_in_chain_order() {
334        let p = proto::PropagatedHistory {
335            scope: proto::HistoryPropagationScope::Lineage as i32,
336            chunks: vec![
337                raw_chunk("app-a", "i1", "Wf1", 1),
338                raw_chunk("app-b", "i2", "Wf2", 1),
339                raw_chunk("app-a", "i3", "Wf3", 1),
340            ],
341        };
342        let h = PropagatedHistory::from_proto(p).unwrap();
343        assert_eq!(h.app_ids(), vec!["app-a".to_string(), "app-b".to_string()]);
344    }
345
346    #[test]
347    fn filters_return_not_found_for_missing_names() {
348        let p = proto::PropagatedHistory {
349            scope: proto::HistoryPropagationScope::OwnHistory as i32,
350            chunks: vec![raw_chunk("app-a", "inst", "WfA", 1)],
351        };
352        let h = PropagatedHistory::from_proto(p).unwrap();
353        assert!(h.workflow_by_name("missing").is_err());
354        assert!(h.events_by_app_id("missing").is_err());
355        assert!(h.events_by_instance_id("missing").is_err());
356        assert!(h.events_by_workflow_name("missing").is_err());
357    }
358
359    #[test]
360    fn filters_return_matching_events() {
361        let p = proto::PropagatedHistory {
362            scope: proto::HistoryPropagationScope::Lineage as i32,
363            chunks: vec![
364                raw_chunk("app-a", "inst-a", "WfA", 2),
365                raw_chunk("app-b", "inst-b", "WfB", 3),
366            ],
367        };
368        let h = PropagatedHistory::from_proto(p).unwrap();
369        assert_eq!(h.events_by_app_id("app-a").unwrap().len(), 2);
370        assert_eq!(h.events_by_instance_id("inst-b").unwrap().len(), 3);
371        assert_eq!(h.events_by_workflow_name("WfA").unwrap().len(), 2);
372        assert_eq!(h.workflow_by_name("WfB").unwrap().instance_id, "inst-b");
373    }
374}