uni_plugin/lifecycle.rs
1//! Plugin lifecycle state machine.
2//!
3//! Each plugin moves through a defined lifecycle:
4//!
5//! ```text
6//! Loaded → Linked → Initialized → Active → Draining → Removed
7//! ```
8//!
9//! The state machine is encapsulated here so hot reload (M10 cutover)
10//! can drain in-flight references through `Arc::clone` semantics before
11//! swapping in the new instance.
12
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU8, Ordering};
15
16use crate::plugin::PluginId;
17
18/// Discrete lifecycle state.
19///
20/// The state machine is monotonic-forward: a plugin progresses from
21/// `Loaded` through `Removed` and never moves backward. Hot reload
22/// removes the old generation and starts a new one at `Loaded`.
23#[repr(u8)]
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25#[non_exhaustive]
26pub enum LifecycleState {
27 /// Bytes/source ingested, manifest parsed.
28 Loaded = 0,
29 /// Capabilities negotiated, WIT linker configured, `register()` called.
30 Linked = 1,
31 /// `init()` ran successfully in dependency order.
32 Initialized = 2,
33 /// In the registry; visible to query planning and execution.
34 Active = 3,
35 /// Removed from new operations; in-flight tx may still hold Arcs.
36 Draining = 4,
37 /// All in-flight references released; resources freed.
38 Removed = 5,
39}
40
41impl LifecycleState {
42 fn from_u8(v: u8) -> Self {
43 match v {
44 0 => Self::Loaded,
45 1 => Self::Linked,
46 2 => Self::Initialized,
47 3 => Self::Active,
48 4 => Self::Draining,
49 _ => Self::Removed,
50 }
51 }
52}
53
54/// Per-plugin lifecycle handle.
55///
56/// Constructed at `Uni::add_plugin` time; advanced through the states
57/// by the loader. The state is held in an `AtomicU8` so wait-free
58/// reads work from the query path (e.g., to short-circuit dispatch
59/// against a draining plugin).
60#[derive(Debug)]
61pub struct PluginLifecycle {
62 plugin: PluginId,
63 state: AtomicU8,
64}
65
66impl PluginLifecycle {
67 /// Construct in [`LifecycleState::Loaded`].
68 #[must_use]
69 pub fn new(plugin: PluginId) -> Self {
70 Self {
71 plugin,
72 state: AtomicU8::new(LifecycleState::Loaded as u8),
73 }
74 }
75
76 /// Current state.
77 #[must_use]
78 pub fn state(&self) -> LifecycleState {
79 LifecycleState::from_u8(self.state.load(Ordering::SeqCst))
80 }
81
82 /// Plugin id this handle tracks.
83 #[must_use]
84 pub fn plugin(&self) -> &PluginId {
85 &self.plugin
86 }
87
88 /// Advance to the next state. Returns the new state.
89 ///
90 /// Forward-only: invocation when already at [`LifecycleState::Removed`]
91 /// stays at `Removed`.
92 pub fn advance(&self) -> LifecycleState {
93 let cur = self.state.load(Ordering::SeqCst);
94 let next = match LifecycleState::from_u8(cur) {
95 LifecycleState::Loaded => LifecycleState::Linked,
96 LifecycleState::Linked => LifecycleState::Initialized,
97 LifecycleState::Initialized => LifecycleState::Active,
98 LifecycleState::Active => LifecycleState::Draining,
99 LifecycleState::Draining => LifecycleState::Removed,
100 LifecycleState::Removed => LifecycleState::Removed,
101 };
102 self.state.store(next as u8, Ordering::SeqCst);
103 next
104 }
105
106 /// Force-set the state. Useful for tests and for unwinding a failed
107 /// `register()` back to `Loaded`.
108 pub fn set(&self, s: LifecycleState) {
109 self.state.store(s as u8, Ordering::SeqCst);
110 }
111
112 /// Returns `true` if the plugin is in the `Active` state.
113 #[must_use]
114 pub fn is_active(&self) -> bool {
115 self.state() == LifecycleState::Active
116 }
117
118 /// Returns `true` if the plugin is in `Draining` or `Removed`.
119 #[must_use]
120 pub fn is_winding_down(&self) -> bool {
121 matches!(
122 self.state(),
123 LifecycleState::Draining | LifecycleState::Removed
124 )
125 }
126}
127
128/// Shared lifecycle handle suitable for capturing in plan / query closures.
129pub type SharedLifecycle = Arc<PluginLifecycle>;
130
131// =========================================================================
132// M10: Epoch-fenced reload driver
133// =========================================================================
134
135/// Orchestrator for an atomic plugin reload — implements the
136/// proposal's §11.2 epoch-fenced cutover.
137///
138/// The reload protocol:
139///
140/// 1. The host calls [`EpochFencedReload::begin_drain`] on the **old**
141/// plugin's lifecycle. The state advances `Active → Draining`. New
142/// operations no longer capture the old plugin; in-flight ops that
143/// already captured an `Arc<dyn ScalarPluginFn>` continue against
144/// the old instance.
145/// 2. The host invokes the load path for the **new** plugin and walks
146/// its lifecycle `Loaded → Linked → Initialized → Active`.
147/// 3. The host calls [`EpochFencedReload::wait_for_drain`], polling
148/// `Arc::strong_count` on the old plugin's lifecycle. When the
149/// only outstanding reference is the framework's bookkeeping `Arc`,
150/// every in-flight operation against the old plugin has completed.
151/// 4. The host advances the old plugin's lifecycle to `Removed` via
152/// [`EpochFencedReload::finalize`].
153///
154/// The driver is decoupled from per-kind reload discipline (per
155/// proposal §11.2.1) — that lives in the host's storage / index /
156/// background-job specific code. This driver provides the general
157/// state-machine orchestration.
158///
159/// `EpochFencedReload` is intentionally synchronous (no `async fn`)
160/// so it can be called from any context. For long-drain scenarios
161/// (e.g., 10s wall-clock queries against a storage backend), the
162/// host wraps the polling loop in its own async runtime.
163#[derive(Debug)]
164pub struct EpochFencedReload {
165 /// Lifecycle of the plugin being drained.
166 old: Arc<PluginLifecycle>,
167}
168
169impl EpochFencedReload {
170 /// Construct a driver for an old-plugin lifecycle.
171 #[must_use]
172 pub fn new(old: Arc<PluginLifecycle>) -> Self {
173 Self { old }
174 }
175
176 /// Begin draining the old plugin. Advances state `Active → Draining`.
177 ///
178 /// # Errors
179 ///
180 /// Returns `Err` if the old plugin is not in `Active` state — the
181 /// caller can choose to retry or bail out depending on policy.
182 pub fn begin_drain(&self) -> Result<(), DrainError> {
183 if !self.old.is_active() {
184 return Err(DrainError::NotActive {
185 current: self.old.state(),
186 });
187 }
188 // Single atomic advance Active → Draining.
189 let new = self.old.advance();
190 if new != LifecycleState::Draining {
191 return Err(DrainError::UnexpectedTransition { reached: new });
192 }
193 Ok(())
194 }
195
196 /// Wait until the old plugin has been fully drained.
197 ///
198 /// Polls `Arc::strong_count(&self.old)` with the supplied poll
199 /// interval. Returns when the count drops to `threshold` (the
200 /// number of bookkeeping `Arc`s the framework holds), or
201 /// `Err(DrainError::Timeout)` if `max_wait` elapses first.
202 ///
203 /// `threshold` is typically 1 (just the framework's own
204 /// `EpochFencedReload::old` Arc); pass 2 if the host also keeps
205 /// a side reference for diagnostics.
206 ///
207 /// # Errors
208 ///
209 /// Returns [`DrainError::Timeout`] if the strong-count doesn't
210 /// drop to `threshold` within `max_wait`.
211 pub fn wait_for_drain(
212 &self,
213 threshold: usize,
214 poll_interval: std::time::Duration,
215 max_wait: std::time::Duration,
216 ) -> Result<(), DrainError> {
217 let start = std::time::Instant::now();
218 loop {
219 let count = Arc::strong_count(&self.old);
220 if count <= threshold {
221 return Ok(());
222 }
223 if start.elapsed() >= max_wait {
224 return Err(DrainError::Timeout {
225 waited: start.elapsed(),
226 strong_count: count,
227 });
228 }
229 std::thread::sleep(poll_interval);
230 }
231 }
232
233 /// Finalize the drain — advance to `Removed`.
234 ///
235 /// Typically called only after [`Self::wait_for_drain`] succeeds.
236 /// Safe to call multiple times (idempotent at `Removed`).
237 pub fn finalize(&self) {
238 // Drive forward to Removed regardless of where we are.
239 while self.old.state() != LifecycleState::Removed {
240 self.old.advance();
241 }
242 }
243
244 /// Shared access to the old plugin's lifecycle (e.g., for
245 /// observability hooks reading the current state).
246 #[must_use]
247 pub fn old_lifecycle(&self) -> &Arc<PluginLifecycle> {
248 &self.old
249 }
250}
251
252/// Drain operation errors.
253#[derive(Debug, thiserror::Error)]
254pub enum DrainError {
255 /// `begin_drain` called on a plugin not in `Active`.
256 #[error("cannot drain: plugin is in {current:?}, not Active")]
257 NotActive {
258 /// The current lifecycle state.
259 current: LifecycleState,
260 },
261
262 /// `advance()` from `Active` returned an unexpected state. Should
263 /// never happen with the current state machine.
264 #[error("unexpected lifecycle transition: reached {reached:?}")]
265 UnexpectedTransition {
266 /// State we landed in unexpectedly.
267 reached: LifecycleState,
268 },
269
270 /// `wait_for_drain` timed out without the strong-count dropping.
271 #[error(
272 "drain timed out after {waited:?}; strong_count remained {strong_count} (threshold not reached)"
273 )]
274 Timeout {
275 /// Elapsed wait time.
276 waited: std::time::Duration,
277 /// Final observed Arc strong count.
278 strong_count: usize,
279 },
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn lifecycle_starts_at_loaded() {
288 let l = PluginLifecycle::new(PluginId::new("x"));
289 assert_eq!(l.state(), LifecycleState::Loaded);
290 }
291
292 #[test]
293 fn advance_progresses_through_states() {
294 let l = PluginLifecycle::new(PluginId::new("x"));
295 assert_eq!(l.advance(), LifecycleState::Linked);
296 assert_eq!(l.advance(), LifecycleState::Initialized);
297 assert_eq!(l.advance(), LifecycleState::Active);
298 assert!(l.is_active());
299 assert_eq!(l.advance(), LifecycleState::Draining);
300 assert!(l.is_winding_down());
301 assert_eq!(l.advance(), LifecycleState::Removed);
302 assert!(l.is_winding_down());
303 // Already at Removed — stays put.
304 assert_eq!(l.advance(), LifecycleState::Removed);
305 }
306
307 #[test]
308 fn set_is_explicit_state_override() {
309 let l = PluginLifecycle::new(PluginId::new("x"));
310 l.set(LifecycleState::Active);
311 assert!(l.is_active());
312 }
313
314 // ── EpochFencedReload tests ────────────────────────────────────
315
316 #[test]
317 fn epoch_drain_advances_state_from_active() {
318 let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
319 l.set(LifecycleState::Active);
320 let driver = EpochFencedReload::new(Arc::clone(&l));
321 driver.begin_drain().unwrap();
322 assert_eq!(l.state(), LifecycleState::Draining);
323 }
324
325 #[test]
326 fn epoch_drain_rejects_non_active_state() {
327 let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
328 // Default state is Loaded.
329 let driver = EpochFencedReload::new(l);
330 let err = driver.begin_drain().unwrap_err();
331 match err {
332 DrainError::NotActive { current } => {
333 assert_eq!(current, LifecycleState::Loaded);
334 }
335 other => panic!("expected NotActive, got {other:?}"),
336 }
337 }
338
339 #[test]
340 fn wait_for_drain_returns_immediately_when_below_threshold() {
341 let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
342 l.set(LifecycleState::Active);
343 let driver = EpochFencedReload::new(Arc::clone(&l));
344 driver.begin_drain().unwrap();
345 // The driver holds one Arc; `l` here holds another. With
346 // threshold=2, count == threshold immediately.
347 driver
348 .wait_for_drain(
349 2,
350 std::time::Duration::from_millis(1),
351 std::time::Duration::from_millis(100),
352 )
353 .expect("should drain immediately");
354 }
355
356 #[test]
357 fn wait_for_drain_times_out_when_references_persist() {
358 let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
359 l.set(LifecycleState::Active);
360 let extra = Arc::clone(&l);
361 let driver = EpochFencedReload::new(Arc::clone(&l));
362 driver.begin_drain().unwrap();
363 // Now strong_count >= 3 (driver + l + extra). With threshold 1,
364 // wait_for_drain can't succeed.
365 let err = driver
366 .wait_for_drain(
367 1,
368 std::time::Duration::from_millis(1),
369 std::time::Duration::from_millis(20),
370 )
371 .unwrap_err();
372 match err {
373 DrainError::Timeout {
374 waited: _,
375 strong_count,
376 } => {
377 assert!(strong_count >= 3);
378 }
379 other => panic!("expected Timeout, got {other:?}"),
380 }
381 drop(extra); // release the captured ref
382 }
383
384 #[test]
385 fn finalize_advances_to_removed() {
386 let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
387 l.set(LifecycleState::Active);
388 let driver = EpochFencedReload::new(Arc::clone(&l));
389 driver.begin_drain().unwrap();
390 driver.finalize();
391 assert_eq!(l.state(), LifecycleState::Removed);
392 }
393
394 #[test]
395 fn finalize_is_idempotent_at_removed() {
396 let l = Arc::new(PluginLifecycle::new(PluginId::new("x")));
397 l.set(LifecycleState::Removed);
398 let driver = EpochFencedReload::new(Arc::clone(&l));
399 driver.finalize();
400 driver.finalize();
401 assert_eq!(l.state(), LifecycleState::Removed);
402 }
403
404 #[test]
405 fn epoch_fenced_reload_end_to_end() {
406 // Realistic flow: an old plugin is Active; the host begins
407 // draining, releases its non-bookkeeping references (simulated),
408 // wait_for_drain succeeds, finalize moves to Removed.
409 let l = Arc::new(PluginLifecycle::new(PluginId::new("plugin.geo")));
410 l.set(LifecycleState::Active);
411
412 // Host has its bookkeeping Arc + driver's Arc.
413 let host_arc = Arc::clone(&l);
414 let driver = EpochFencedReload::new(Arc::clone(&l));
415 driver.begin_drain().expect("drain begin");
416
417 // No extra references: strong_count is 3 (l, host_arc, driver.old).
418 // Drop host_arc to simulate the host releasing.
419 drop(host_arc);
420
421 // Now strong_count == 2 (l, driver.old). With threshold=2 the
422 // drain completes immediately.
423 driver
424 .wait_for_drain(
425 2,
426 std::time::Duration::from_millis(1),
427 std::time::Duration::from_secs(1),
428 )
429 .expect("wait_for_drain");
430 driver.finalize();
431 assert_eq!(l.state(), LifecycleState::Removed);
432 }
433}