daedalus_runtime/executor/
payload.rs

1pub use daedalus_data::model::Value;
2#[cfg(feature = "gpu")]
3use daedalus_gpu::ErasedPayload;
4#[cfg(feature = "gpu")]
5use daedalus_gpu::GpuImageHandle;
6use std::any::Any;
7use std::sync::Arc;
8use std::time::Instant;
9
10/// Runtime payload carried over edges. Keep minimal and cheap to clone.
11///
12/// ```
13/// use daedalus_runtime::executor::EdgePayload;
14/// let payload = EdgePayload::Unit;
15/// assert!(matches!(payload, EdgePayload::Unit));
16/// ```
17#[derive(Clone, Debug)]
18pub enum EdgePayload {
19    Unit,
20    Bytes(Arc<[u8]>),
21    Value(Value),
22    Any(Arc<dyn Any + Send + Sync>),
23    #[cfg(feature = "gpu")]
24    Payload(ErasedPayload),
25    #[cfg(feature = "gpu")]
26    GpuImage(GpuImageHandle),
27}
28
29/// Correlated payload with a shared emission identifier.
30///
31/// ```
32/// use daedalus_runtime::executor::{CorrelatedPayload, EdgePayload};
33/// let correlated = CorrelatedPayload::from_edge(EdgePayload::Unit);
34/// assert!(correlated.correlation_id > 0);
35/// ```
36#[derive(Clone, Debug)]
37pub struct CorrelatedPayload {
38    pub correlation_id: u64,
39    pub inner: EdgePayload,
40    pub enqueued_at: Instant,
41}
42
43impl CorrelatedPayload {
44    /// Wrap an edge payload with a new correlation id.
45    pub fn from_edge(inner: EdgePayload) -> Self {
46        Self {
47            correlation_id: next_correlation_id(),
48            inner,
49            enqueued_at: Instant::now(),
50        }
51    }
52}
53
54/// Generate a new correlation id.
55///
56/// ```
57/// use daedalus_runtime::executor::next_correlation_id;
58/// let a = next_correlation_id();
59/// let b = next_correlation_id();
60/// assert!(b > a);
61/// ```
62pub fn next_correlation_id() -> u64 {
63    static CORR: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
64    CORR.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
65}