daedalus_runtime/executor/
queue.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3
4#[cfg(feature = "lockfree-queues")]
5use crossbeam_queue::ArrayQueue;
6
7use crate::plan::{BackpressureStrategy, EdgePolicyKind};
8
9use super::{CorrelatedPayload, ExecutionTelemetry};
10use super::payload_size_bytes;
11
12/// Simple ring buffer for bounded queues.
13pub struct RingBuf {
14    buf: Vec<Option<CorrelatedPayload>>,
15    head: usize,
16    len: usize,
17}
18
19impl RingBuf {
20    pub fn new(cap: usize) -> Self {
21        Self {
22            buf: vec![None; cap.max(1)],
23            head: 0,
24            len: 0,
25        }
26    }
27
28    pub fn cap(&self) -> usize {
29        self.buf.len()
30    }
31
32    pub fn pop_front(&mut self) -> Option<CorrelatedPayload> {
33        if self.len == 0 {
34            return None;
35        }
36        let idx = self.head;
37        let out = self.buf[idx].take();
38        self.head = (self.head + 1) % self.cap();
39        self.len -= 1;
40        out
41    }
42
43    pub fn push_back(&mut self, payload: CorrelatedPayload) -> bool {
44        let mut dropped = false;
45        if self.len == self.cap() {
46            // drop oldest
47            self.pop_front();
48            dropped = true;
49        }
50        let idx = (self.head + self.len) % self.cap();
51        self.buf[idx] = Some(payload);
52        self.len = (self.len + 1).min(self.cap());
53        dropped
54    }
55
56    pub fn is_full(&self) -> bool {
57        self.len == self.cap()
58    }
59
60    pub fn len(&self) -> usize {
61        self.len
62    }
63}
64
65pub enum EdgeQueue {
66    Deque(std::collections::VecDeque<CorrelatedPayload>),
67    Bounded { ring: RingBuf },
68}
69
70impl Default for EdgeQueue {
71    fn default() -> Self {
72        EdgeQueue::Deque(std::collections::VecDeque::new())
73    }
74}
75
76impl EdgeQueue {
77    pub(crate) fn pop_front(&mut self) -> Option<CorrelatedPayload> {
78        match self {
79            EdgeQueue::Deque(d) => d.pop_front(),
80            EdgeQueue::Bounded { ring } => ring.pop_front(),
81        }
82    }
83
84    pub fn ensure_policy(&mut self, policy: &EdgePolicyKind) {
85        match policy {
86            EdgePolicyKind::Bounded { cap } => match self {
87                EdgeQueue::Bounded { ring } => {
88                    if ring.cap() != *cap {
89                        *ring = RingBuf::new(*cap);
90                    }
91                }
92                _ => {
93                    *self = EdgeQueue::Bounded {
94                        ring: RingBuf::new(*cap),
95                    }
96                }
97            },
98            EdgePolicyKind::Fifo | EdgePolicyKind::Broadcast | EdgePolicyKind::NewestWins => {
99                if let EdgeQueue::Bounded { .. } = self {
100                    *self = EdgeQueue::Deque(std::collections::VecDeque::new());
101                }
102            }
103        }
104    }
105
106    pub fn is_full(&self) -> bool {
107        match self {
108            EdgeQueue::Deque(_) => false,
109            EdgeQueue::Bounded { ring } => ring.is_full(),
110        }
111    }
112
113    pub fn len(&self) -> usize {
114        match self {
115            EdgeQueue::Deque(d) => d.len(),
116            EdgeQueue::Bounded { ring } => ring.len(),
117        }
118    }
119
120    pub fn push(&mut self, policy: &EdgePolicyKind, payload: CorrelatedPayload) -> bool {
121        match policy {
122            EdgePolicyKind::NewestWins => {
123                match self {
124                    EdgeQueue::Deque(d) => {
125                        d.clear();
126                        d.push_back(payload);
127                    }
128                    EdgeQueue::Bounded { .. } => {
129                        *self = EdgeQueue::Deque(std::collections::VecDeque::from([payload]));
130                    }
131                }
132                false
133            }
134            EdgePolicyKind::Broadcast | EdgePolicyKind::Fifo => {
135                match self {
136                    EdgeQueue::Deque(d) => d.push_back(payload),
137                    EdgeQueue::Bounded { .. } => {
138                        *self = EdgeQueue::Deque(std::collections::VecDeque::from([payload]));
139                    }
140                }
141                false
142            }
143            EdgePolicyKind::Bounded { cap } => match self {
144                EdgeQueue::Bounded { ring } => ring.push_back(payload),
145                EdgeQueue::Deque(d) => {
146                    let mut ring = RingBuf::new(*cap);
147                    for p in d.drain(..) {
148                        ring.push_back(p);
149                    }
150                    let dropped = ring.push_back(payload);
151                    *self = EdgeQueue::Bounded { ring };
152                    dropped
153                }
154            },
155        }
156    }
157}
158
159/// Storage wrapper per edge; allows swapping queue implementations.
160pub enum EdgeStorage {
161    Locked(Arc<Mutex<EdgeQueue>>),
162    #[cfg(feature = "lockfree-queues")]
163    BoundedLf(Arc<ArrayQueue<CorrelatedPayload>>),
164}
165
166pub fn build_queues(plan: &crate::plan::RuntimePlan) -> Vec<EdgeStorage> {
167    #[cfg(feature = "lockfree-queues")]
168    let use_lockfree = plan.lockfree_queues;
169    plan.edges
170        .iter()
171        .map(|(_, _, _, _, policy)| match policy {
172            EdgePolicyKind::Bounded { cap } => {
173                #[cfg(feature = "lockfree-queues")]
174                {
175                    if use_lockfree {
176                        EdgeStorage::BoundedLf(Arc::new(ArrayQueue::new(*cap)))
177                    } else {
178                        EdgeStorage::Locked(Arc::new(Mutex::new(EdgeQueue::Bounded {
179                            ring: RingBuf::new(*cap),
180                        })))
181                    }
182                }
183                #[cfg(not(feature = "lockfree-queues"))]
184                {
185                    EdgeStorage::Locked(Arc::new(Mutex::new(EdgeQueue::Bounded {
186                        ring: RingBuf::new(*cap),
187                    })))
188                }
189            }
190            _ => EdgeStorage::Locked(Arc::new(Mutex::new(EdgeQueue::default()))),
191        })
192        .collect()
193}
194
195#[allow(clippy::too_many_arguments)]
196pub fn apply_policy(
197    edge_idx: usize,
198    policy: &EdgePolicyKind,
199    payload: &CorrelatedPayload,
200    queues: &Arc<Vec<EdgeStorage>>,
201    warnings_seen: &Arc<Mutex<std::collections::HashSet<String>>>,
202    telem: &mut ExecutionTelemetry,
203    warning_label: Option<String>,
204    backpressure: BackpressureStrategy,
205) {
206    if let Some(storage) = queues.get(edge_idx) {
207        let payload_bytes = if cfg!(feature = "metrics")
208            && telem.metrics_level.is_detailed()
209        {
210            payload_size_bytes(&payload.inner)
211        } else {
212            None
213        };
214        telem.record_edge_payload(edge_idx, payload_bytes);
215        match storage {
216            EdgeStorage::Locked(q_arc) => {
217                if let Ok(mut q) = q_arc.lock() {
218                    q.ensure_policy(policy);
219                    let dropped = match (policy, backpressure) {
220                        (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::BoundedQueues)
221                            if q.is_full() =>
222                        {
223                            true
224                        }
225                        (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::ErrorOnOverflow)
226                            if q.is_full() =>
227                        {
228                            telem.backpressure_events += 1;
229                            let label = warning_label
230                                .clone()
231                                .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
232                            record_warning(&label, warnings_seen, telem);
233                            return;
234                        }
235                        _ => {
236                            let mut payload = payload.clone();
237                            payload.enqueued_at = Instant::now();
238                            q.push(policy, payload)
239                        }
240                    };
241                    if dropped {
242                        telem.backpressure_events += 1;
243                        let label = warning_label
244                            .clone()
245                            .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
246                        record_warning(&label, warnings_seen, telem);
247                    }
248                    telem.record_edge_depth(edge_idx, q.len());
249                }
250            }
251            #[cfg(feature = "lockfree-queues")]
252            EdgeStorage::BoundedLf(q) => {
253                let mut dropped = false;
254                match backpressure {
255                    BackpressureStrategy::BoundedQueues => {
256                        if q.is_full() {
257                            dropped = true;
258                        } else {
259                            let mut payload = payload.clone();
260                            payload.enqueued_at = Instant::now();
261                            q.push(payload).unwrap();
262                        }
263                    }
264                    BackpressureStrategy::ErrorOnOverflow => {
265                        if q.is_full() {
266                            telem.backpressure_events += 1;
267                            let label = warning_label
268                                .clone()
269                                .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
270                            record_warning(&label, warnings_seen, telem);
271                            return;
272                        } else {
273                            let mut payload = payload.clone();
274                            payload.enqueued_at = Instant::now();
275                            q.push(payload).unwrap();
276                        }
277                    }
278                    BackpressureStrategy::None => {
279                        let mut payload = payload.clone();
280                        payload.enqueued_at = Instant::now();
281                        if q.push(payload.clone()).is_err() {
282                            let _ = q.pop();
283                            let _ = q.push(payload.clone());
284                            dropped = true;
285                        }
286                    }
287                }
288                if dropped {
289                    telem.backpressure_events += 1;
290                    let label = warning_label
291                        .clone()
292                        .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
293                    record_warning(&label, warnings_seen, telem);
294                }
295                telem.record_edge_depth(edge_idx, q.len());
296            }
297        }
298    }
299}
300
301pub struct ApplyPolicyOwnedArgs<'a> {
302    pub edge_idx: usize,
303    pub policy: &'a EdgePolicyKind,
304    pub payload: CorrelatedPayload,
305    pub queues: &'a Arc<Vec<EdgeStorage>>,
306    pub warnings_seen: &'a Arc<Mutex<std::collections::HashSet<String>>>,
307    pub telem: &'a mut ExecutionTelemetry,
308    pub warning_label: Option<String>,
309    pub backpressure: BackpressureStrategy,
310}
311
312pub fn apply_policy_owned(args: ApplyPolicyOwnedArgs<'_>) {
313    let ApplyPolicyOwnedArgs {
314        edge_idx,
315        policy,
316        mut payload,
317        queues,
318        warnings_seen,
319        telem,
320        warning_label,
321        backpressure,
322    } = args;
323    if let Some(storage) = queues.get(edge_idx) {
324        let payload_bytes = if cfg!(feature = "metrics")
325            && telem.metrics_level.is_detailed()
326        {
327            payload_size_bytes(&payload.inner)
328        } else {
329            None
330        };
331        telem.record_edge_payload(edge_idx, payload_bytes);
332        match storage {
333            EdgeStorage::Locked(q_arc) => {
334                if let Ok(mut q) = q_arc.lock() {
335                    q.ensure_policy(policy);
336                    let dropped = match (policy, backpressure) {
337                        (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::BoundedQueues)
338                            if q.is_full() =>
339                        {
340                            true
341                        }
342                        (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::ErrorOnOverflow)
343                            if q.is_full() =>
344                        {
345                            telem.backpressure_events += 1;
346                            let label = warning_label
347                                .clone()
348                                .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
349                            record_warning(&label, warnings_seen, telem);
350                            return;
351                        }
352                        _ => {
353                            payload.enqueued_at = Instant::now();
354                            q.push(policy, payload)
355                        }
356                    };
357                    if dropped {
358                        telem.backpressure_events += 1;
359                        let label = warning_label
360                            .clone()
361                            .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
362                        record_warning(&label, warnings_seen, telem);
363                    }
364                    telem.record_edge_depth(edge_idx, q.len());
365                }
366            }
367            #[cfg(feature = "lockfree-queues")]
368            EdgeStorage::BoundedLf(q) => {
369                let mut dropped = false;
370                match backpressure {
371                    BackpressureStrategy::BoundedQueues => {
372                        if q.is_full() {
373                            dropped = true;
374                        } else {
375                            payload.enqueued_at = Instant::now();
376                            q.push(payload).unwrap();
377                        }
378                    }
379                    BackpressureStrategy::ErrorOnOverflow => {
380                        if q.is_full() {
381                            telem.backpressure_events += 1;
382                            let label = warning_label
383                                .clone()
384                                .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
385                            record_warning(&label, warnings_seen, telem);
386                            return;
387                        } else {
388                            payload.enqueued_at = Instant::now();
389                            q.push(payload).unwrap();
390                        }
391                    }
392                    BackpressureStrategy::None => {
393                        payload.enqueued_at = Instant::now();
394                        if q.push(payload).is_err() {
395                            dropped = true;
396                        }
397                    }
398                }
399                if dropped {
400                    telem.backpressure_events += 1;
401                    let label = warning_label
402                        .clone()
403                        .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
404                    record_warning(&label, warnings_seen, telem);
405                }
406                telem.record_edge_depth(edge_idx, q.len());
407            }
408            #[cfg(feature = "lockfree-queues")]
409            EdgeStorage::UnboundedLf(q) => {
410                payload.enqueued_at = Instant::now();
411                q.push(payload).unwrap();
412                telem.record_edge_depth(edge_idx, q.len());
413            }
414        }
415    }
416}
417
418fn record_warning(
419    label: &str,
420    seen: &Arc<Mutex<std::collections::HashSet<String>>>,
421    telem: &mut ExecutionTelemetry,
422) {
423    if let Ok(mut s) = seen.lock()
424        && s.insert(label.to_string())
425    {
426        telem.warnings.push(label.to_string());
427    }
428}