daedalus_runtime/executor/
queue.rs1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3
4#[cfg(feature = "lockfree-queues")]
5use crossbeam_queue::ArrayQueue;
6
7use crate::plan::{BackpressureStrategy, EdgePolicyKind};
8
9use super::{CorrelatedPayload, ExecutionTelemetry};
10use super::payload_size_bytes;
11
12pub struct RingBuf {
14 buf: Vec<Option<CorrelatedPayload>>,
15 head: usize,
16 len: usize,
17}
18
19impl RingBuf {
20 pub fn new(cap: usize) -> Self {
21 Self {
22 buf: vec![None; cap.max(1)],
23 head: 0,
24 len: 0,
25 }
26 }
27
28 pub fn cap(&self) -> usize {
29 self.buf.len()
30 }
31
32 pub fn pop_front(&mut self) -> Option<CorrelatedPayload> {
33 if self.len == 0 {
34 return None;
35 }
36 let idx = self.head;
37 let out = self.buf[idx].take();
38 self.head = (self.head + 1) % self.cap();
39 self.len -= 1;
40 out
41 }
42
43 pub fn push_back(&mut self, payload: CorrelatedPayload) -> bool {
44 let mut dropped = false;
45 if self.len == self.cap() {
46 self.pop_front();
48 dropped = true;
49 }
50 let idx = (self.head + self.len) % self.cap();
51 self.buf[idx] = Some(payload);
52 self.len = (self.len + 1).min(self.cap());
53 dropped
54 }
55
56 pub fn is_full(&self) -> bool {
57 self.len == self.cap()
58 }
59
60 pub fn len(&self) -> usize {
61 self.len
62 }
63}
64
65pub enum EdgeQueue {
66 Deque(std::collections::VecDeque<CorrelatedPayload>),
67 Bounded { ring: RingBuf },
68}
69
70impl Default for EdgeQueue {
71 fn default() -> Self {
72 EdgeQueue::Deque(std::collections::VecDeque::new())
73 }
74}
75
76impl EdgeQueue {
77 pub(crate) fn pop_front(&mut self) -> Option<CorrelatedPayload> {
78 match self {
79 EdgeQueue::Deque(d) => d.pop_front(),
80 EdgeQueue::Bounded { ring } => ring.pop_front(),
81 }
82 }
83
84 pub fn ensure_policy(&mut self, policy: &EdgePolicyKind) {
85 match policy {
86 EdgePolicyKind::Bounded { cap } => match self {
87 EdgeQueue::Bounded { ring } => {
88 if ring.cap() != *cap {
89 *ring = RingBuf::new(*cap);
90 }
91 }
92 _ => {
93 *self = EdgeQueue::Bounded {
94 ring: RingBuf::new(*cap),
95 }
96 }
97 },
98 EdgePolicyKind::Fifo | EdgePolicyKind::Broadcast | EdgePolicyKind::NewestWins => {
99 if let EdgeQueue::Bounded { .. } = self {
100 *self = EdgeQueue::Deque(std::collections::VecDeque::new());
101 }
102 }
103 }
104 }
105
106 pub fn is_full(&self) -> bool {
107 match self {
108 EdgeQueue::Deque(_) => false,
109 EdgeQueue::Bounded { ring } => ring.is_full(),
110 }
111 }
112
113 pub fn len(&self) -> usize {
114 match self {
115 EdgeQueue::Deque(d) => d.len(),
116 EdgeQueue::Bounded { ring } => ring.len(),
117 }
118 }
119
120 pub fn push(&mut self, policy: &EdgePolicyKind, payload: CorrelatedPayload) -> bool {
121 match policy {
122 EdgePolicyKind::NewestWins => {
123 match self {
124 EdgeQueue::Deque(d) => {
125 d.clear();
126 d.push_back(payload);
127 }
128 EdgeQueue::Bounded { .. } => {
129 *self = EdgeQueue::Deque(std::collections::VecDeque::from([payload]));
130 }
131 }
132 false
133 }
134 EdgePolicyKind::Broadcast | EdgePolicyKind::Fifo => {
135 match self {
136 EdgeQueue::Deque(d) => d.push_back(payload),
137 EdgeQueue::Bounded { .. } => {
138 *self = EdgeQueue::Deque(std::collections::VecDeque::from([payload]));
139 }
140 }
141 false
142 }
143 EdgePolicyKind::Bounded { cap } => match self {
144 EdgeQueue::Bounded { ring } => ring.push_back(payload),
145 EdgeQueue::Deque(d) => {
146 let mut ring = RingBuf::new(*cap);
147 for p in d.drain(..) {
148 ring.push_back(p);
149 }
150 let dropped = ring.push_back(payload);
151 *self = EdgeQueue::Bounded { ring };
152 dropped
153 }
154 },
155 }
156 }
157}
158
159pub enum EdgeStorage {
161 Locked(Arc<Mutex<EdgeQueue>>),
162 #[cfg(feature = "lockfree-queues")]
163 BoundedLf(Arc<ArrayQueue<CorrelatedPayload>>),
164}
165
166pub fn build_queues(plan: &crate::plan::RuntimePlan) -> Vec<EdgeStorage> {
167 #[cfg(feature = "lockfree-queues")]
168 let use_lockfree = plan.lockfree_queues;
169 plan.edges
170 .iter()
171 .map(|(_, _, _, _, policy)| match policy {
172 EdgePolicyKind::Bounded { cap } => {
173 #[cfg(feature = "lockfree-queues")]
174 {
175 if use_lockfree {
176 EdgeStorage::BoundedLf(Arc::new(ArrayQueue::new(*cap)))
177 } else {
178 EdgeStorage::Locked(Arc::new(Mutex::new(EdgeQueue::Bounded {
179 ring: RingBuf::new(*cap),
180 })))
181 }
182 }
183 #[cfg(not(feature = "lockfree-queues"))]
184 {
185 EdgeStorage::Locked(Arc::new(Mutex::new(EdgeQueue::Bounded {
186 ring: RingBuf::new(*cap),
187 })))
188 }
189 }
190 _ => EdgeStorage::Locked(Arc::new(Mutex::new(EdgeQueue::default()))),
191 })
192 .collect()
193}
194
195#[allow(clippy::too_many_arguments)]
196pub fn apply_policy(
197 edge_idx: usize,
198 policy: &EdgePolicyKind,
199 payload: &CorrelatedPayload,
200 queues: &Arc<Vec<EdgeStorage>>,
201 warnings_seen: &Arc<Mutex<std::collections::HashSet<String>>>,
202 telem: &mut ExecutionTelemetry,
203 warning_label: Option<String>,
204 backpressure: BackpressureStrategy,
205) {
206 if let Some(storage) = queues.get(edge_idx) {
207 let payload_bytes = if cfg!(feature = "metrics")
208 && telem.metrics_level.is_detailed()
209 {
210 payload_size_bytes(&payload.inner)
211 } else {
212 None
213 };
214 telem.record_edge_payload(edge_idx, payload_bytes);
215 match storage {
216 EdgeStorage::Locked(q_arc) => {
217 if let Ok(mut q) = q_arc.lock() {
218 q.ensure_policy(policy);
219 let dropped = match (policy, backpressure) {
220 (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::BoundedQueues)
221 if q.is_full() =>
222 {
223 true
224 }
225 (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::ErrorOnOverflow)
226 if q.is_full() =>
227 {
228 telem.backpressure_events += 1;
229 let label = warning_label
230 .clone()
231 .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
232 record_warning(&label, warnings_seen, telem);
233 return;
234 }
235 _ => {
236 let mut payload = payload.clone();
237 payload.enqueued_at = Instant::now();
238 q.push(policy, payload)
239 }
240 };
241 if dropped {
242 telem.backpressure_events += 1;
243 let label = warning_label
244 .clone()
245 .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
246 record_warning(&label, warnings_seen, telem);
247 }
248 telem.record_edge_depth(edge_idx, q.len());
249 }
250 }
251 #[cfg(feature = "lockfree-queues")]
252 EdgeStorage::BoundedLf(q) => {
253 let mut dropped = false;
254 match backpressure {
255 BackpressureStrategy::BoundedQueues => {
256 if q.is_full() {
257 dropped = true;
258 } else {
259 let mut payload = payload.clone();
260 payload.enqueued_at = Instant::now();
261 q.push(payload).unwrap();
262 }
263 }
264 BackpressureStrategy::ErrorOnOverflow => {
265 if q.is_full() {
266 telem.backpressure_events += 1;
267 let label = warning_label
268 .clone()
269 .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
270 record_warning(&label, warnings_seen, telem);
271 return;
272 } else {
273 let mut payload = payload.clone();
274 payload.enqueued_at = Instant::now();
275 q.push(payload).unwrap();
276 }
277 }
278 BackpressureStrategy::None => {
279 let mut payload = payload.clone();
280 payload.enqueued_at = Instant::now();
281 if q.push(payload.clone()).is_err() {
282 let _ = q.pop();
283 let _ = q.push(payload.clone());
284 dropped = true;
285 }
286 }
287 }
288 if dropped {
289 telem.backpressure_events += 1;
290 let label = warning_label
291 .clone()
292 .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
293 record_warning(&label, warnings_seen, telem);
294 }
295 telem.record_edge_depth(edge_idx, q.len());
296 }
297 }
298 }
299}
300
301pub struct ApplyPolicyOwnedArgs<'a> {
302 pub edge_idx: usize,
303 pub policy: &'a EdgePolicyKind,
304 pub payload: CorrelatedPayload,
305 pub queues: &'a Arc<Vec<EdgeStorage>>,
306 pub warnings_seen: &'a Arc<Mutex<std::collections::HashSet<String>>>,
307 pub telem: &'a mut ExecutionTelemetry,
308 pub warning_label: Option<String>,
309 pub backpressure: BackpressureStrategy,
310}
311
312pub fn apply_policy_owned(args: ApplyPolicyOwnedArgs<'_>) {
313 let ApplyPolicyOwnedArgs {
314 edge_idx,
315 policy,
316 mut payload,
317 queues,
318 warnings_seen,
319 telem,
320 warning_label,
321 backpressure,
322 } = args;
323 if let Some(storage) = queues.get(edge_idx) {
324 let payload_bytes = if cfg!(feature = "metrics")
325 && telem.metrics_level.is_detailed()
326 {
327 payload_size_bytes(&payload.inner)
328 } else {
329 None
330 };
331 telem.record_edge_payload(edge_idx, payload_bytes);
332 match storage {
333 EdgeStorage::Locked(q_arc) => {
334 if let Ok(mut q) = q_arc.lock() {
335 q.ensure_policy(policy);
336 let dropped = match (policy, backpressure) {
337 (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::BoundedQueues)
338 if q.is_full() =>
339 {
340 true
341 }
342 (EdgePolicyKind::Bounded { .. }, BackpressureStrategy::ErrorOnOverflow)
343 if q.is_full() =>
344 {
345 telem.backpressure_events += 1;
346 let label = warning_label
347 .clone()
348 .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
349 record_warning(&label, warnings_seen, telem);
350 return;
351 }
352 _ => {
353 payload.enqueued_at = Instant::now();
354 q.push(policy, payload)
355 }
356 };
357 if dropped {
358 telem.backpressure_events += 1;
359 let label = warning_label
360 .clone()
361 .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
362 record_warning(&label, warnings_seen, telem);
363 }
364 telem.record_edge_depth(edge_idx, q.len());
365 }
366 }
367 #[cfg(feature = "lockfree-queues")]
368 EdgeStorage::BoundedLf(q) => {
369 let mut dropped = false;
370 match backpressure {
371 BackpressureStrategy::BoundedQueues => {
372 if q.is_full() {
373 dropped = true;
374 } else {
375 payload.enqueued_at = Instant::now();
376 q.push(payload).unwrap();
377 }
378 }
379 BackpressureStrategy::ErrorOnOverflow => {
380 if q.is_full() {
381 telem.backpressure_events += 1;
382 let label = warning_label
383 .clone()
384 .unwrap_or_else(|| format!("bounded_error_edge_{edge_idx}"));
385 record_warning(&label, warnings_seen, telem);
386 return;
387 } else {
388 payload.enqueued_at = Instant::now();
389 q.push(payload).unwrap();
390 }
391 }
392 BackpressureStrategy::None => {
393 payload.enqueued_at = Instant::now();
394 if q.push(payload).is_err() {
395 dropped = true;
396 }
397 }
398 }
399 if dropped {
400 telem.backpressure_events += 1;
401 let label = warning_label
402 .clone()
403 .unwrap_or_else(|| format!("bounded_drop_edge_{edge_idx}"));
404 record_warning(&label, warnings_seen, telem);
405 }
406 telem.record_edge_depth(edge_idx, q.len());
407 }
408 #[cfg(feature = "lockfree-queues")]
409 EdgeStorage::UnboundedLf(q) => {
410 payload.enqueued_at = Instant::now();
411 q.push(payload).unwrap();
412 telem.record_edge_depth(edge_idx, q.len());
413 }
414 }
415 }
416}
417
418fn record_warning(
419 label: &str,
420 seen: &Arc<Mutex<std::collections::HashSet<String>>>,
421 telem: &mut ExecutionTelemetry,
422) {
423 if let Ok(mut s) = seen.lock()
424 && s.insert(label.to_string())
425 {
426 telem.warnings.push(label.to_string());
427 }
428}