Skip to main content

uni_plugin/
lifecycle.rs

1//! Plugin lifecycle state machine.
2//!
3//! Each plugin moves through a defined lifecycle:
4//!
5//! ```text
6//!   Loaded → Linked → Initialized → Active → Draining → Removed
7//! ```
8//!
9//! The state machine is encapsulated here so hot reload (M10 cutover)
10//! can drain in-flight references through `Arc::clone` semantics before
11//! swapping in the new instance.
12
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU8, Ordering};
15
16use crate::plugin::PluginId;
17
18/// Discrete lifecycle state.
19///
20/// The state machine is monotonic-forward: a plugin progresses from
21/// `Loaded` through `Removed` and never moves backward. Hot reload
22/// removes the old generation and starts a new one at `Loaded`.
23#[repr(u8)]
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum LifecycleState {
27    /// Bytes/source ingested, manifest parsed.
28    Loaded = 0,
29    /// Capabilities negotiated, WIT linker configured, `register()` called.
30    Linked = 1,
31    /// `init()` ran successfully in dependency order.
32    Initialized = 2,
33    /// In the registry; visible to query planning and execution.
34    Active = 3,
35    /// Removed from new operations; in-flight tx may still hold Arcs.
36    Draining = 4,
37    /// All in-flight references released; resources freed.
38    Removed = 5,
39}
40
41impl LifecycleState {
42    fn from_u8(v: u8) -> Self {
43        match v {
44            0 => Self::Loaded,
45            1 => Self::Linked,
46            2 => Self::Initialized,
47            3 => Self::Active,
48            4 => Self::Draining,
49            _ => Self::Removed,
50        }
51    }
52}
53
54/// Per-plugin lifecycle handle.
55///
56/// Constructed at `Uni::add_plugin` time; advanced through the states
57/// by the loader. The state is held in an `AtomicU8` so wait-free
58/// reads work from the query path (e.g., to short-circuit dispatch
59/// against a draining plugin).
60#[derive(Debug)]
61pub struct PluginLifecycle {
62    plugin: PluginId,
63    state: AtomicU8,
64}
65
66impl PluginLifecycle {
67    /// Construct in [`LifecycleState::Loaded`].
68    #[must_use]
69    pub fn new(plugin: PluginId) -> Self {
70        Self {
71            plugin,
72            state: AtomicU8::new(LifecycleState::Loaded as u8),
73        }
74    }
75
76    /// Current state.
77    #[must_use]
78    pub fn state(&self) -> LifecycleState {
79        LifecycleState::from_u8(self.state.load(Ordering::SeqCst))
80    }
81
82    /// Plugin id this handle tracks.
83    #[must_use]
84    pub fn plugin(&self) -> &PluginId {
85        &self.plugin
86    }
87
88    /// Advance to the next state. Returns the new state.
89    ///
90    /// Forward-only: invocation when already at [`LifecycleState::Removed`]
91    /// stays at `Removed`.
92    pub fn advance(&self) -> LifecycleState {
93        let cur = self.state.load(Ordering::SeqCst);
94        let next = match LifecycleState::from_u8(cur) {
95            LifecycleState::Loaded => LifecycleState::Linked,
96            LifecycleState::Linked => LifecycleState::Initialized,
97            LifecycleState::Initialized => LifecycleState::Active,
98            LifecycleState::Active => LifecycleState::Draining,
99            LifecycleState::Draining => LifecycleState::Removed,
100            LifecycleState::Removed => LifecycleState::Removed,
101        };
102        self.state.store(next as u8, Ordering::SeqCst);
103        next
104    }
105
106    /// Force-set the state. Useful for tests and for unwinding a failed
107    /// `register()` back to `Loaded`.
108    pub fn set(&self, s: LifecycleState) {
109        self.state.store(s as u8, Ordering::SeqCst);
110    }
111
112    /// Returns `true` if the plugin is in the `Active` state.
113    #[must_use]
114    pub fn is_active(&self) -> bool {
115        self.state() == LifecycleState::Active
116    }
117
118    /// Returns `true` if the plugin is in `Draining` or `Removed`.
119    #[must_use]
120    pub fn is_winding_down(&self) -> bool {
121        matches!(
122            self.state(),
123            LifecycleState::Draining | LifecycleState::Removed
124        )
125    }
126}
127
128/// Shared lifecycle handle suitable for capturing in plan / query closures.
129pub type SharedLifecycle = Arc<PluginLifecycle>;
130
131// =========================================================================
132// M10: Epoch-fenced reload driver
133// =========================================================================
134
135/// Orchestrator for an atomic plugin reload — implements the
136/// proposal's §11.2 epoch-fenced cutover.
137///
138/// The reload protocol:
139///
140/// 1. The host calls [`EpochFencedReload::begin_drain`] on the **old**
141///    plugin's lifecycle. The state advances `Active → Draining`. New
142///    operations no longer capture the old plugin; in-flight ops that
143///    already captured an `Arc<dyn ScalarPluginFn>` continue against
144///    the old instance.
145/// 2. The host invokes the load path for the **new** plugin and walks
146///    its lifecycle `Loaded → Linked → Initialized → Active`.
147/// 3. The host calls [`EpochFencedReload::wait_for_drain`], polling
148///    `Arc::strong_count` on the old plugin's lifecycle. When the
149///    only outstanding reference is the framework's bookkeeping `Arc`,
150///    every in-flight operation against the old plugin has completed.
151/// 4. The host advances the old plugin's lifecycle to `Removed` via
152///    [`EpochFencedReload::finalize`].
153///
154/// The driver is decoupled from per-kind reload discipline (per
155/// proposal §11.2.1) — that lives in the host's storage / index /
156/// background-job specific code. This driver provides the general
157/// state-machine orchestration.
158///
159/// `EpochFencedReload` is intentionally synchronous (no `async fn`)
160/// so it can be called from any context. For long-drain scenarios
161/// (e.g., 10s wall-clock queries against a storage backend), the
162/// host wraps the polling loop in its own async runtime.
163#[derive(Debug)]
164pub struct EpochFencedReload {
165    /// Lifecycle of the plugin being drained.
166    old: Arc<PluginLifecycle>,
167}
168
169impl EpochFencedReload {
170    /// Construct a driver for an old-plugin lifecycle.
171    #[must_use]
172    pub fn new(old: Arc<PluginLifecycle>) -> Self {
173        Self { old }
174    }
175
176    /// Begin draining the old plugin. Advances state `Active → Draining`.
177    ///
178    /// # Errors
179    ///
180    /// Returns `Err` if the old plugin is not in `Active` state — the
181    /// caller can choose to retry or bail out depending on policy.
182    pub fn begin_drain(&self) -> Result<(), DrainError> {
183        if !self.old.is_active() {
184            return Err(DrainError::NotActive {
185                current: self.old.state(),
186            });
187        }
188        // Single atomic advance Active → Draining.
189        let new = self.old.advance();
190        if new != LifecycleState::Draining {
191            return Err(DrainError::UnexpectedTransition { reached: new });
192        }
193        Ok(())
194    }
195
196    /// Wait until the old plugin has been fully drained.
197    ///
198    /// Polls `Arc::strong_count(&self.old)` with the supplied poll
199    /// interval. Returns when the count drops to `threshold` (the
200    /// number of bookkeeping `Arc`s the framework holds), or
201    /// `Err(DrainError::Timeout)` if `max_wait` elapses first.
202    ///
203    /// `threshold` is typically 1 (just the framework's own
204    /// `EpochFencedReload::old` Arc); pass 2 if the host also keeps
205    /// a side reference for diagnostics.
206    ///
207    /// # Errors
208    ///
209    /// Returns [`DrainError::Timeout`] if the strong-count doesn't
210    /// drop to `threshold` within `max_wait`.
211    pub fn wait_for_drain(
212        &self,
213        threshold: usize,
214        poll_interval: std::time::Duration,
215        max_wait: std::time::Duration,
216    ) -> Result<(), DrainError> {
217        let start = std::time::Instant::now();
218        loop {
219            let count = Arc::strong_count(&self.old);
220            if count <= threshold {
221                return Ok(());
222            }
223            if start.elapsed() >= max_wait {
224                return Err(DrainError::Timeout {
225                    waited: start.elapsed(),
226                    strong_count: count,
227                });
228            }
229            std::thread::sleep(poll_interval);
230        }
231    }
232
233    /// Finalize the drain — advance to `Removed`.
234    ///
235    /// Typically called only after [`Self::wait_for_drain`] succeeds.
236    /// Safe to call multiple times (idempotent at `Removed`).
237    pub fn finalize(&self) {
238        // Drive forward to Removed regardless of where we are.
239        while self.old.state() != LifecycleState::Removed {
240            self.old.advance();
241        }
242    }
243
244    /// Shared access to the old plugin's lifecycle (e.g., for
245    /// observability hooks reading the current state).
246    #[must_use]
247    pub fn old_lifecycle(&self) -> &Arc<PluginLifecycle> {
248        &self.old
249    }
250}
251
252/// Drain operation errors.
253#[derive(Debug, thiserror::Error)]
254pub enum DrainError {
255    /// `begin_drain` called on a plugin not in `Active`.
256    #[error("cannot drain: plugin is in {current:?}, not Active")]
257    NotActive {
258        /// The current lifecycle state.
259        current: LifecycleState,
260    },
261
262    /// `advance()` from `Active` returned an unexpected state. Should
263    /// never happen with the current state machine.
264    #[error("unexpected lifecycle transition: reached {reached:?}")]
265    UnexpectedTransition {
266        /// State we landed in unexpectedly.
267        reached: LifecycleState,
268    },
269
270    /// `wait_for_drain` timed out without the strong-count dropping.
271    #[error(
272        "drain timed out after {waited:?}; strong_count remained {strong_count} (threshold not reached)"
273    )]
274    Timeout {
275        /// Elapsed wait time.
276        waited: std::time::Duration,
277        /// Final observed Arc strong count.
278        strong_count: usize,
279    },
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn lifecycle_starts_at_loaded() {
288        let l = PluginLifecycle::new(PluginId::new("x"));
289        assert_eq!(l.state(), LifecycleState::Loaded);
290    }
291
292    #[test]
293    fn advance_progresses_through_states() {
294        let l = PluginLifecycle::new(PluginId::new("x"));
295        assert_eq!(l.advance(), LifecycleState::Linked);
296        assert_eq!(l.advance(), LifecycleState::Initialized);
297        assert_eq!(l.advance(), LifecycleState::Active);
298        assert!(l.is_active());
299        assert_eq!(l.advance(), LifecycleState::Draining);
300        assert!(l.is_winding_down());
301        assert_eq!(l.advance(), LifecycleState::Removed);
302        assert!(l.is_winding_down());
303        // Already at Removed — stays put.
304        assert_eq!(l.advance(), LifecycleState::Removed);
305    }
306
307    #[test]
308    fn set_is_explicit_state_override() {
309        let l = PluginLifecycle::new(PluginId::new("x"));
310        l.set(LifecycleState::Active);
311        assert!(l.is_active());
312    }
313
314    // ── EpochFencedReload tests ────────────────────────────────────
315
316    #[test]
317    fn epoch_drain_advances_state_from_active() {
318        let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
319        l.set(LifecycleState::Active);
320        let driver = EpochFencedReload::new(Arc::clone(&l));
321        driver.begin_drain().unwrap();
322        assert_eq!(l.state(), LifecycleState::Draining);
323    }
324
325    #[test]
326    fn epoch_drain_rejects_non_active_state() {
327        let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
328        // Default state is Loaded.
329        let driver = EpochFencedReload::new(l);
330        let err = driver.begin_drain().unwrap_err();
331        match err {
332            DrainError::NotActive { current } => {
333                assert_eq!(current, LifecycleState::Loaded);
334            }
335            other => panic!("expected NotActive, got {other:?}"),
336        }
337    }
338
339    #[test]
340    fn wait_for_drain_returns_immediately_when_below_threshold() {
341        let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
342        l.set(LifecycleState::Active);
343        let driver = EpochFencedReload::new(Arc::clone(&l));
344        driver.begin_drain().unwrap();
345        // The driver holds one Arc; `l` here holds another. With
346        // threshold=2, count == threshold immediately.
347        driver
348            .wait_for_drain(
349                2,
350                std::time::Duration::from_millis(1),
351                std::time::Duration::from_millis(100),
352            )
353            .expect("should drain immediately");
354    }
355
356    #[test]
357    fn wait_for_drain_times_out_when_references_persist() {
358        let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
359        l.set(LifecycleState::Active);
360        let extra = Arc::clone(&l);
361        let driver = EpochFencedReload::new(Arc::clone(&l));
362        driver.begin_drain().unwrap();
363        // Now strong_count >= 3 (driver + l + extra). With threshold 1,
364        // wait_for_drain can't succeed.
365        let err = driver
366            .wait_for_drain(
367                1,
368                std::time::Duration::from_millis(1),
369                std::time::Duration::from_millis(20),
370            )
371            .unwrap_err();
372        match err {
373            DrainError::Timeout {
374                waited: _,
375                strong_count,
376            } => {
377                assert!(strong_count >= 3);
378            }
379            other => panic!("expected Timeout, got {other:?}"),
380        }
381        drop(extra); // release the captured ref
382    }
383
384    #[test]
385    fn finalize_advances_to_removed() {
386        let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
387        l.set(LifecycleState::Active);
388        let driver = EpochFencedReload::new(Arc::clone(&l));
389        driver.begin_drain().unwrap();
390        driver.finalize();
391        assert_eq!(l.state(), LifecycleState::Removed);
392    }
393
394    #[test]
395    fn finalize_is_idempotent_at_removed() {
396        let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
397        l.set(LifecycleState::Removed);
398        let driver = EpochFencedReload::new(Arc::clone(&l));
399        driver.finalize();
400        driver.finalize();
401        assert_eq!(l.state(), LifecycleState::Removed);
402    }
403
404    #[test]
405    fn epoch_fenced_reload_end_to_end() {
406        // Realistic flow: an old plugin is Active; the host begins
407        // draining, releases its non-bookkeeping references (simulated),
408        // wait_for_drain succeeds, finalize moves to Removed.
409        let l = Arc::new(PluginLifecycle::new(PluginId::new("plugin.geo")));
410        l.set(LifecycleState::Active);
411
412        // Host has its bookkeeping Arc + driver's Arc.
413        let host_arc = Arc::clone(&l);
414        let driver = EpochFencedReload::new(Arc::clone(&l));
415        driver.begin_drain().expect("drain begin");
416
417        // No extra references: strong_count is 3 (l, host_arc, driver.old).
418        // Drop host_arc to simulate the host releasing.
419        drop(host_arc);
420
421        // Now strong_count == 2 (l, driver.old). With threshold=2 the
422        // drain completes immediately.
423        driver
424            .wait_for_drain(
425                2,
426                std::time::Duration::from_millis(1),
427                std::time::Duration::from_secs(1),
428            )
429            .expect("wait_for_drain");
430        driver.finalize();
431        assert_eq!(l.state(), LifecycleState::Removed);
432    }
433}