Skip to main content

pipedream_rs/
envelope.rs

1use std::any::{Any, TypeId};
2use std::sync::Arc;
3
4use crate::tracker::CompletionTracker;
5
6/// A type-erased envelope that can carry any `Send + Sync` value.
7#[derive(Clone)]
8pub struct Envelope {
9    pub value: Arc<dyn Any + Send + Sync>,
10    pub(crate) type_id: TypeId,
11    pub(crate) msg_id: u64,
12    pub(crate) tracker: Option<Arc<CompletionTracker>>,
13    /// Stream ID where this envelope originated (for echo prevention)
14    pub(crate) origin: u64,
15}
16
17impl Envelope {
18    /// Create a new envelope from any `Send + Sync` value.
19    pub fn new<T: 'static + Send + Sync>(
20        value: T,
21        msg_id: u64,
22        tracker: Option<Arc<CompletionTracker>>,
23    ) -> Self {
24        Self {
25            value: Arc::new(value),
26            type_id: TypeId::of::<T>(),
27            msg_id,
28            tracker,
29            origin: 0, // Will be set by the sending stream
30        }
31    }
32
33    /// Create a new envelope with a specific origin stream ID.
34    pub fn with_origin<T: 'static + Send + Sync>(
35        value: T,
36        msg_id: u64,
37        tracker: Option<Arc<CompletionTracker>>,
38        origin: u64,
39    ) -> Self {
40        Self {
41            value: Arc::new(value),
42            type_id: TypeId::of::<T>(),
43            msg_id,
44            tracker,
45            origin,
46        }
47    }
48
49    /// Create an envelope from an already type-erased value with known TypeId.
50    ///
51    /// This is useful when you have a value that's already been type-erased
52    /// (e.g., from a registry or dynamic dispatch) but want to preserve
53    /// its original type for downstream filtering.
54    pub fn from_any(
55        value: Arc<dyn Any + Send + Sync>,
56        type_id: TypeId,
57        msg_id: u64,
58        tracker: Option<Arc<CompletionTracker>>,
59    ) -> Self {
60        Self {
61            value,
62            type_id,
63            msg_id,
64            tracker,
65            origin: 0, // Will be set by the sending stream
66        }
67    }
68
69    /// Create an envelope from type-erased value with a specific origin.
70    pub fn from_any_with_origin(
71        value: Arc<dyn Any + Send + Sync>,
72        type_id: TypeId,
73        msg_id: u64,
74        tracker: Option<Arc<CompletionTracker>>,
75        origin: u64,
76    ) -> Self {
77        Self {
78            value,
79            type_id,
80            msg_id,
81            tracker,
82            origin,
83        }
84    }
85
86    /// Get the TypeId of the contained value.
87    pub fn type_id(&self) -> TypeId {
88        self.type_id
89    }
90
91    /// Get the message ID.
92    pub fn msg_id(&self) -> u64 {
93        self.msg_id
94    }
95
96    /// Get the completion tracker if present.
97    pub fn tracker(&self) -> Option<Arc<CompletionTracker>> {
98        self.tracker.clone()
99    }
100
101    /// Get the origin stream ID.
102    pub fn origin(&self) -> u64 {
103        self.origin
104    }
105
106    /// Create a copy of this envelope with a new origin.
107    pub fn with_new_origin(&self, origin: u64) -> Self {
108        Self {
109            value: self.value.clone(),
110            type_id: self.type_id,
111            msg_id: self.msg_id,
112            tracker: self.tracker.clone(),
113            origin,
114        }
115    }
116
117    /// Create a copy of this envelope without a tracker.
118    /// Used when forwarding events - the target stream has its own handlers.
119    pub fn without_tracker(&self) -> Self {
120        Self {
121            value: self.value.clone(),
122            type_id: self.type_id,
123            msg_id: self.msg_id,
124            tracker: None,
125            origin: self.origin,
126        }
127    }
128
129    /// Attempt to downcast to a concrete type reference.
130    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
131        self.value.downcast_ref()
132    }
133
134    /// Attempt to get an Arc of the concrete type.
135    pub fn downcast<T: 'static + Send + Sync>(&self) -> Option<Arc<T>> {
136        Arc::downcast(self.value.clone()).ok()
137    }
138
139    /// Check if the envelope contains a value of type T.
140    pub fn is<T: 'static>(&self) -> bool {
141        self.type_id == TypeId::of::<T>()
142    }
143}
144
145impl std::fmt::Debug for Envelope {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("Envelope")
148            .field("type_id", &self.type_id)
149            .finish()
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_envelope_creation() {
159        let env = Envelope::new("hello".to_string(), 0, None);
160        assert!(env.is::<String>());
161        assert!(!env.is::<i32>());
162    }
163
164    #[test]
165    fn test_envelope_downcast() {
166        let env = Envelope::new(42i32, 0, None);
167        assert_eq!(env.downcast_ref::<i32>(), Some(&42));
168        assert_eq!(env.downcast_ref::<String>(), None);
169    }
170
171    #[test]
172    fn test_envelope_clone() {
173        let env1 = Envelope::new("test".to_string(), 0, None);
174        let env2 = env1.clone();
175        assert_eq!(env1.downcast_ref::<String>(), env2.downcast_ref::<String>());
176    }
177
178    #[test]
179    fn test_envelope_msg_id() {
180        let env = Envelope::new("test".to_string(), 42, None);
181        assert_eq!(env.msg_id(), 42);
182    }
183
184    #[test]
185    fn test_envelope_tracker() {
186        let tracker = Arc::new(CompletionTracker::new(1));
187        let env = Envelope::new("test".to_string(), 0, Some(tracker.clone()));
188        assert!(env.tracker().is_some());
189
190        let env_no_tracker = Envelope::new("test".to_string(), 0, None);
191        assert!(env_no_tracker.tracker().is_none());
192    }
193}