dfir_rs 0.16.0

DFIR runtime for Rust, used by Hydro.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
//! Module for the inline DFIR runtime context and execution engine.
//!
//! Provides [`Context`] (the lightweight operator context) and
//! [`Dfir`] (the dataflow execution wrapper).

use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::task::Wake;

#[cfg(feature = "meta")]
use dfir_lang::diagnostic::{Diagnostic, Diagnostics, SerdeSpan};
#[cfg(feature = "meta")]
use dfir_lang::graph::DfirGraph;

use super::metrics::{DfirMetrics, DfirMetricsIntervals};
use super::state::StateHandle;
use super::{StateLifespan, StateTag, SubgraphId};
use crate::scheduled::ticks::TickInstant;
use crate::util::slot_vec::SlotVec;

/// Internal state storage for operator accumulators.
struct StateData {
    state: Box<dyn Any>,
    lifespan_hook_fn: Option<LifespanResetFn>,
    /// `None` for static.
    lifespan: Option<StateLifespan>,
}
type LifespanResetFn = Box<dyn FnMut(&mut dyn Any)>;

/// Coordinates waking between [`Context`] (inside the tick closure) and [`Dfir`]
/// (the external runner). Shared via `Arc` between both.
///
/// When external data arrives (e.g., a tokio stream receives a message), the [`Context::waker`]
/// fires, which sets `can_start_tick` and wakes the [`Dfir::run`](Dfir::run) task so it starts a new tick.
/// Implements [`Wake`] directly so it can be used as a `Waker` without an extra wrapper.
#[doc(hidden)]
pub struct WakeState {
    /// Set to `true` when external data arrives, signaling that a new tick should run.
    /// Checked by [`Dfir::run_tick`](Dfir::run_tick) and [`Dfir::run_available`](Dfir::run_available).
    can_start_tick: std::sync::atomic::AtomicBool,
    /// Wakes the [`Dfir::run`](Dfir::run) task from its idle `poll_fn` sleep.
    task_waker: futures::task::AtomicWaker,
}

impl Default for WakeState {
    fn default() -> Self {
        Self {
            can_start_tick: std::sync::atomic::AtomicBool::new(false),
            task_waker: futures::task::AtomicWaker::new(),
        }
    }
}

impl Wake for WakeState {
    fn wake(self: Arc<Self>) {
        self.wake_by_ref();
    }

    fn wake_by_ref(self: &Arc<Self>) {
        self.can_start_tick.store(true, Ordering::Relaxed);
        self.task_waker.wake();
    }
}

/// A lightweight context for inline codegen that avoids the overhead of the full
/// [`Context`] (no tokio channels, no scheduler queues, no loop machinery).
///
/// Exposes the same method names that operator-generated code calls on both
/// `df` (for prologues: `add_state`, `set_state_lifespan_hook`) and
/// `context` (for iterators: `state_ref_unchecked`, `is_first_run_this_tick`, etc.).
#[doc(hidden)]
#[derive(Default)]
pub struct Context {
    /// Storage for the operator-facing State API.
    states: SlotVec<StateTag, StateData>,
    /// Counter for number of ticks run.
    current_tick: TickInstant,
    /// Coordinates waking between [`Context`] (inside the tick closure) and [`Dfir`]
    /// (the external runner). Shared via `Arc` between both. Implements [`Wake`].
    wake_state: Arc<WakeState>,
    /// Live-updating DFIR runtime metrics via interior mutability.
    metrics: Rc<DfirMetrics>,
    /// Tasks buffered via [`Self::request_task`], spawned by [`Dfir::spawn_tasks`]
    /// once the runtime is running inside a tokio `LocalSet`.
    tasks_to_spawn: Vec<Pin<Box<dyn Future<Output = ()> + 'static>>>,
}

impl Context {
    /// Create a new inline context with shared wake state and metrics.
    pub fn new(wake_state: Arc<WakeState>, metrics: Rc<DfirMetrics>) -> Self {
        Self {
            states: SlotVec::new(),
            current_tick: TickInstant::default(),
            wake_state,
            metrics,
            tasks_to_spawn: Vec::new(),
        }
    }

    // --- Methods called as `df.xxx()` in operator prologues ---

    /// Adds state and returns a handle.
    pub fn add_state<T>(&mut self, state: T) -> StateHandle<T>
    where
        T: Any,
    {
        let state_data = StateData {
            state: Box::new(state),
            lifespan_hook_fn: None,
            lifespan: None,
        };
        let state_id = self.states.insert(state_data);
        StateHandle {
            state_id,
            _phantom: PhantomData,
        }
    }

    /// Sets a hook to modify state at the end of each tick.
    pub fn set_state_lifespan_hook<T>(
        &mut self,
        handle: StateHandle<T>,
        _lifespan: StateLifespan,
        mut hook_fn: impl 'static + FnMut(&mut T),
    ) where
        T: Any,
    {
        let state_data = self
            .states
            .get_mut(handle.state_id)
            .expect("Failed to find state with given handle.");
        state_data.lifespan_hook_fn = Some(Box::new(move |state| {
            (hook_fn)(state.downcast_mut::<T>().unwrap());
        }));
        state_data.lifespan = Some(_lifespan);
    }

    /// Buffers an async task to be spawned later by [`Dfir::spawn_tasks`].
    ///
    /// Tasks are deferred because `write_prologue` runs during graph construction,
    /// which may occur before a tokio `LocalSet` is entered. Buffered tasks are
    /// drained and spawned via `tokio::task::spawn_local` at the start of
    /// [`Dfir::run_tick`]. Tasks requested after that point remain buffered until
    /// the next call to [`Dfir::run_tick`].
    pub fn request_task<Fut>(&mut self, future: Fut)
    where
        Fut: Future<Output = ()> + 'static,
    {
        self.tasks_to_spawn.push(Box::pin(future));
    }

    // --- Methods called as `context.xxx()` in operator iterators ---

    /// Returns a shared reference to the state.
    ///
    /// # Safety
    /// `StateHandle<T>` must be from _this_ instance.
    pub unsafe fn state_ref_unchecked<T>(&self, handle: StateHandle<T>) -> &'_ T
    where
        T: Any,
    {
        let state = self
            .states
            .get(handle.state_id)
            .expect("Failed to find state with given handle.")
            .state
            .as_ref();
        debug_assert!(state.is::<T>());
        unsafe { &*(state as *const dyn Any as *const T) }
    }

    /// Always returns `true` in inline mode. The inline codegen runs the entire DAG
    /// once per tick with no re-execution, so every subgraph is always on its first
    /// (and only) run within each tick.
    pub fn is_first_run_this_tick(&self) -> bool {
        true
    }

    /// Gets the current tick count.
    pub fn current_tick(&self) -> TickInstant {
        self.current_tick
    }

    /// Returns a reference to the runtime metrics.
    pub fn metrics(&self) -> &Rc<DfirMetrics> {
        &self.metrics
    }

    /// No-op: inline mode has no subgraph scheduling.
    pub fn current_subgraph(&self) -> SubgraphId {
        SubgraphId::from_raw(0)
    }

    /// In inline mode, every subgraph runs unconditionally each tick, so the `sg_id`
    /// parameter is ignored. Only `is_external` matters: when `true`, it signals that
    /// external data has arrived and a new tick should be started.
    pub fn schedule_subgraph(&self, _sg_id: SubgraphId, is_external: bool) {
        if is_external {
            self.wake_state.wake_by_ref();
        }
    }

    /// Returns a waker that signals external data has arrived.
    pub fn waker(&self) -> std::task::Waker {
        std::task::Waker::from(self.wake_state.clone())
    }

    /// Runs end-of-tick state hooks and increments the tick counter.
    /// Called by the generated tick closure at the end of each tick.
    #[doc(hidden)]
    pub fn __end_tick(&mut self) {
        for state_data in self.states.values_mut() {
            let StateData {
                state,
                lifespan_hook_fn: Some(lifespan_hook_fn),
                lifespan: Some(StateLifespan::Tick),
            } = state_data
            else {
                continue;
            };
            (lifespan_hook_fn)(Box::deref_mut(state));
        }
        self.current_tick += crate::scheduled::ticks::TickDuration::SINGLE_TICK;
    }
}

/// A wrapper around an inline-codegen tick closure that provides [`Self::run`],
/// [`Self::run_available`], and [`Self::run_tick`] methods — mirroring the [`Dfir`](super::context::Dfir)
/// API.
///
/// # Design
///
/// The inline codegen generates an `async move |df: &mut Context|` closure that captures
/// dataflow-specific state (handoff buffers, source iterators) and receives the [`Context`]
/// (operator accumulators, tick counter) by reference each tick. `Dfir` owns both the
/// closure and the context, and coordinates tick lifecycle and idle/wake behavior.
///
/// We use a single opaque closure rather than generating a bespoke struct per dataflow because:
/// - The closure naturally captures exactly the state it needs with correct lifetimes
/// - No codegen needed for struct definitions, field accessors, or initialization
/// - Rust's async closure machinery handles the complex state machine (suspend/resume across
///   `.await` points) that would be very difficult to replicate in a generated struct
///
/// The `Tick` type parameter is bounded by [`TickClosure`] (not `AsyncFnMut` directly) to
/// support type erasure via [`TickClosureErased`] / [`DfirErased`] for heterogeneous
/// collections (e.g., the sim runtime storing multiple locations in a `Vec`). The concrete
/// (non-erased) path used by trybuild and embedded has zero overhead.
#[doc(hidden)]
pub struct Dfir<Tick> {
    /// Async closure which runs a single tick when called.
    tick_closure: Tick,
    /// Coordinates waking between [`Context`] (inside the tick closure) and [`Dfir`]
    /// (the external runner). Shared via `Arc` between both. Implements [`Wake`].
    wake_state: Arc<WakeState>,
    /// The inline context, owned by `Dfir` and passed to the tick closure by reference.
    context: Context,
    /// See [`Self::meta_graph()`].
    #[cfg(feature = "meta")]
    meta_graph: Option<DfirGraph>,
    /// See [`Self::diagnostics()`].
    #[cfg(feature = "meta")]
    diagnostics: Option<Vec<Diagnostic<SerdeSpan>>>,
}

/// Trait for tick closures — abstracts over both concrete async closures
/// and type-erased boxed versions ([`TickClosureErased`]).
///
/// The `&mut Context` parameter is owned by [`Dfir`] and lent to the
/// closure each tick, avoiding shared-ownership overhead for the context.
#[doc(hidden)]
pub trait TickClosure {
    /// Call the tick closure. Returns `true` if any subgraph received input data.
    fn call_tick<'a>(&'a mut self, ctx: &'a mut Context) -> impl Future<Output = bool> + 'a;
}

impl<F: for<'a> AsyncFnMut(&'a mut Context) -> bool> TickClosure for F {
    fn call_tick<'a>(&'a mut self, ctx: &'a mut Context) -> impl Future<Output = bool> + 'a {
        self(ctx)
    }
}

/// No-op `TickClosure`.
#[doc(hidden)]
pub struct NullTickClosure;

impl TickClosure for NullTickClosure {
    fn call_tick<'a>(&'a mut self, _ctx: &'a mut Context) -> impl Future<Output = bool> + 'a {
        std::future::ready(false)
    }
}

/// Type-erased tick function for use in heterogeneous collections (e.g., the sim runtime).
#[doc(hidden)]
pub struct TickClosureErased(Box<dyn TickClosureErasedInner>);

/// Object-safe inner trait for [`TickClosureErased`]. Needed because `AsyncFnMut` is not
/// object-safe (GAT return type), but a trait with `&mut self -> Pin<Box<dyn Future + '_>>`
/// is — the returned future borrows from the trait object which owns the closure.
trait TickClosureErasedInner {
    fn call_tick<'a>(
        &'a mut self,
        ctx: &'a mut Context,
    ) -> Pin<Box<dyn Future<Output = bool> + 'a>>;
}

impl<F: for<'a> AsyncFnMut(&'a mut Context) -> bool> TickClosureErasedInner for F {
    fn call_tick<'a>(
        &'a mut self,
        ctx: &'a mut Context,
    ) -> Pin<Box<dyn Future<Output = bool> + 'a>> {
        Box::pin(self(ctx))
    }
}

impl TickClosure for TickClosureErased {
    fn call_tick<'a>(&'a mut self, ctx: &'a mut Context) -> impl Future<Output = bool> + 'a {
        self.0.call_tick(ctx)
    }
}

/// Type alias for a type-erased [`Dfir`] that can be stored in heterogeneous collections.
/// Created via [`Dfir::into_erased`].
pub type DfirErased = Dfir<TickClosureErased>;

impl<Tick: TickClosure> Dfir<Tick> {
    /// Create a new `Dfir` from a tick closure, inline context,
    /// and meta graph / diagnostics JSON strings.
    #[doc(hidden)]
    pub fn new(
        tick_closure: Tick,
        context: Context,
        meta_graph_json: Option<&str>,
        diagnostics_json: Option<&str>,
    ) -> Self {
        #[cfg(not(feature = "meta"))]
        let _ = (meta_graph_json, diagnostics_json);
        Self {
            tick_closure,
            wake_state: context.wake_state.clone(),
            context,
            #[cfg(feature = "meta")]
            meta_graph: meta_graph_json.map(|json| {
                let mut meta_graph: DfirGraph =
                    serde_json::from_str(json).expect("Failed to deserialize graph.");
                let mut op_inst_diagnostics = Diagnostics::new();
                meta_graph.insert_node_op_insts_all(&mut op_inst_diagnostics);
                assert!(
                    op_inst_diagnostics.is_empty(),
                    "Expected no diagnostics, got: {:#?}",
                    op_inst_diagnostics
                );
                meta_graph
            }),
            #[cfg(feature = "meta")]
            diagnostics: diagnostics_json.map(|json| {
                serde_json::from_str(json).expect("Failed to deserialize diagnostics.")
            }),
        }
    }

    /// Return a handle to the meta graph, if set.
    #[cfg(feature = "meta")]
    #[cfg_attr(docsrs, doc(cfg(feature = "meta")))]
    pub fn meta_graph(&self) -> Option<&DfirGraph> {
        self.meta_graph.as_ref()
    }

    /// Returns any diagnostics generated by the surface syntax macro.
    #[cfg(feature = "meta")]
    #[cfg_attr(docsrs, doc(cfg(feature = "meta")))]
    pub fn diagnostics(&self) -> Option<&[Diagnostic<SerdeSpan>]> {
        self.diagnostics.as_deref()
    }

    /// Returns a reference-counted handle to the continually-updated runtime metrics for this DFIR instance.
    pub fn metrics(&self) -> Rc<DfirMetrics> {
        Rc::clone(self.context.metrics())
    }

    /// Gets the current tick (local time) count.
    pub fn current_tick(&self) -> TickInstant {
        self.context.current_tick()
    }

    /// Returns a [`DfirMetricsIntervals`] handle where each call to
    /// [`DfirMetricsIntervals::take_interval`] ends the current interval and returns its metrics.
    ///
    /// The first call to `take_interval` returns metrics since this DFIR instance was created. Each subsequent call to
    /// `take_interval` returns metrics since the previous call.
    ///
    /// Cloning the handle "forks" it from the original, as afterwards each interval may return different metrics
    /// depending on when exactly `take_interval` is called.
    pub fn metrics_intervals(&self) -> DfirMetricsIntervals {
        DfirMetricsIntervals {
            curr: self.metrics(),
            prev: None,
        }
    }
}

impl<Tick: TickClosure> Dfir<Tick> {
    /// Spawns all tasks buffered via [`Context::request_task`].
    ///
    /// This drains the buffer, so subsequent calls are no-ops until new tasks are requested.
    fn spawn_tasks(&mut self) {
        for task in self.context.tasks_to_spawn.drain(..) {
            tokio::task::spawn_local(task);
        }
    }

    /// Run a single tick. Returns `true` if any subgraph received input data.
    ///
    /// Checks both handoff buffers (via `work_done` flag set in generated recv port code)
    /// and external events (via `can_start_tick` set by wakers/schedule_subgraph).
    pub async fn run_tick(&mut self) -> bool {
        self.spawn_tasks();
        let had_external = self
            .wake_state
            .can_start_tick
            .swap(false, Ordering::Relaxed);
        let tick_had_work = self.tick_closure.call_tick(&mut self.context).await;
        had_external || tick_had_work || self.wake_state.can_start_tick.load(Ordering::Relaxed)
    }

    /// Run a single tick synchronously. Panics if the tick yields (async suspension).
    /// Returns `true` if work was done (see [`Self::run_tick`]).
    pub fn run_tick_sync(&mut self) -> bool {
        let mut fut = std::pin::pin!(self.run_tick());
        let mut ctx = std::task::Context::from_waker(std::task::Waker::noop());
        match fut.as_mut().poll(&mut ctx) {
            std::task::Poll::Ready(result) => result,
            std::task::Poll::Pending => {
                panic!("Dfir::run_tick_sync: tick yielded asynchronously.")
            }
        }
    }

    /// Run ticks as long as work is available, then return.
    pub async fn run_available(&mut self) {
        // Always run at least one tick.
        self.wake_state
            .can_start_tick
            .store(false, Ordering::Relaxed);
        loop {
            self.run_tick().await;
            let can_start_tick = self
                .wake_state
                .can_start_tick
                .swap(false, Ordering::Relaxed);
            if !can_start_tick {
                break;
            }
            // Yield between each tick to receive more events.
            tokio::task::yield_now().await;
        }
    }

    /// [`Self::run_available`] but panics if any tick yields asynchronously.
    pub fn run_available_sync(&mut self) {
        self.wake_state
            .can_start_tick
            .store(false, Ordering::Relaxed);
        loop {
            self.run_tick_sync();
            let can_start_tick = self
                .wake_state
                .can_start_tick
                .swap(false, Ordering::Relaxed);
            if !can_start_tick {
                break;
            }
        }
    }

    /// Run forever, processing ticks when work is available and yielding when idle.
    pub async fn run(&mut self) -> crate::Never {
        loop {
            self.run_available().await;
            // Wait for an external event to wake us.
            std::future::poll_fn(|cx| {
                // Register waker first to avoid race: if an event fires between
                // the check and the register, the waker is already in place.
                self.wake_state.task_waker.register(cx.waker());
                if self.wake_state.can_start_tick.load(Ordering::Relaxed) {
                    std::task::Poll::Ready(())
                } else {
                    std::task::Poll::Pending
                }
            })
            .await;
        }
    }
}

impl<Tick: 'static + for<'a> AsyncFnMut(&'a mut Context) -> bool> Dfir<Tick> {
    /// Type-erase the tick closure for use in heterogeneous collections.
    ///
    /// Wraps the concrete async closure in [`TickClosureErased`], which boxes the future
    /// returned by each tick call. This adds one heap allocation per tick, but enables
    /// storing multiple `Dfir`s with different closure types in a single `Vec`.
    ///
    /// Only needed for the sim runtime path. The trybuild and embedded paths keep the
    /// concrete type and pay no erasure cost.
    pub fn into_erased(self) -> DfirErased {
        Dfir {
            tick_closure: TickClosureErased(Box::new(self.tick_closure)),
            wake_state: self.wake_state,
            context: self.context,
            #[cfg(feature = "meta")]
            meta_graph: self.meta_graph,
            #[cfg(feature = "meta")]
            diagnostics: self.diagnostics,
        }
    }
}