Skip to main content

lash_core/runtime/process/
awaiter.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tokio::sync::watch;
6
7use super::events::{
8    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
9};
10use super::model::{
11    AbandonRequest, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
12    ProcessHandleGrantEntry, ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion,
13    ProcessListFilter, ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport,
14    ProcessStarted, SessionScope, WaitState,
15};
16use super::registry::{ProcessPruneReport, ProcessRegistry};
17use crate::PluginError;
18
19const AWAIT_BACKOFF_MIN: Duration = Duration::from_millis(25);
20const AWAIT_BACKOFF_MAX: Duration = Duration::from_secs(1);
21
22#[derive(Clone, Default)]
23pub struct ProcessChangeHub {
24    inner: Arc<Mutex<HashMap<String, watch::Sender<u64>>>>,
25}
26
27impl ProcessChangeHub {
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// Subscribe before reading a process row. The receiver carries only a
33    /// version counter; waiters always re-read the registry after a bump.
34    pub fn subscribe(&self, process_id: &str) -> watch::Receiver<u64> {
35        let mut guard = self.inner.lock().expect("process change hub lock poisoned");
36        guard
37            .entry(process_id.to_string())
38            .or_insert_with(|| {
39                let (tx, _rx) = watch::channel(0);
40                tx
41            })
42            .subscribe()
43    }
44
45    pub fn notify(&self, process_id: &str) {
46        let mut guard = self.inner.lock().expect("process change hub lock poisoned");
47        let mut remove = false;
48        if let Some(tx) = guard.get(process_id) {
49            if tx.receiver_count() == 0 {
50                remove = true;
51            } else {
52                let next = (*tx.borrow()).wrapping_add(1);
53                if tx.send(next).is_err() {
54                    remove = true;
55                }
56            }
57        }
58        if remove {
59            guard.remove(process_id);
60        }
61    }
62
63    #[cfg(test)]
64    fn tracked_processes(&self) -> usize {
65        self.inner
66            .lock()
67            .expect("process change hub lock poisoned")
68            .len()
69    }
70}
71
72/// Host-facing, best-effort push of each appended process event.
73///
74/// A sink is an optional freshness feed, **never a source of truth.** The
75/// durable event log ([`ProcessRegistry::events_after`]) is the only complete
76/// record; a sink lets a host observe appends promptly without polling, but it
77/// makes no delivery promise.
78///
79/// # Contract
80///
81/// - **Best-effort freshness, never truth.** [`WatchedProcessRegistry`] calls
82///   [`emit`](Self::emit) after a successful `append_event`, in that pod's
83///   per-process append order. There is no buffering, no retry, and no
84///   delivery guarantee across pod crashes or restarts: an event that was
85///   appended durably may never reach the sink (e.g. the pod died between the
86///   durable write and the emit). Consumers that need completeness reconcile
87///   from `events_after` — the durable log is authoritative — typically at
88///   terminal time.
89/// - **Terminal events are deliberately NOT emitted through the sink.**
90///   [`ProcessRegistry::complete_process`] appends its terminal event via the
91///   *inner* registry internally, so the decorator never observes it as an
92///   `append_event` and never emits it. Do not wait on the sink for
93///   completion: terminal observation rides
94///   [`ProcessWorkDriver::await_terminal`](crate::ProcessWorkDriver::await_terminal)
95///   (see ADR 0016), which reads the durable terminal state.
96/// - **Emission cannot fail the write.** `emit` returns `()`, so a sink can
97///   never fail or roll back an append; the durable write has already
98///   committed by the time `emit` runs. But the decorator *awaits* `emit`
99///   inline on the append path, so a slow sink slows every append. Implementors
100///   must return fast: hand any real I/O off to a channel or background task
101///   internally rather than blocking inside `emit`.
102///
103/// # Example: offload to a channel
104///
105/// A sink must return fast, so a real implementation hands each event to a
106/// channel and does its projection/logging on a consumer task. Dropping on a
107/// full channel is the correct best-effort behavior — the durable log, read via
108/// `events_after`, remains the reconcile source.
109///
110/// ```
111/// use lash_core::{ProcessEvent, ProcessEventSink};
112/// use tokio::sync::mpsc;
113///
114/// struct ChannelSink {
115///     tx: mpsc::Sender<ProcessEvent>,
116/// }
117///
118/// #[async_trait::async_trait]
119/// impl ProcessEventSink for ChannelSink {
120///     async fn emit(&self, event: &ProcessEvent) {
121///         // Non-blocking: drop on a full channel rather than slow the append.
122///         let _ = self.tx.try_send(event.clone());
123///     }
124/// }
125/// ```
126#[async_trait::async_trait]
127pub trait ProcessEventSink: Send + Sync {
128    /// Observe one appended process event. Best-effort; see the trait contract.
129    ///
130    /// Must be fast and non-blocking — offload I/O to a channel/task internally.
131    async fn emit(&self, event: &ProcessEvent);
132}
133
134/// [`ProcessRegistry`] decorator: publishes in-process change ticks on every
135/// mutation (so [`ProcessAwaiter`] wakes without polling) and, when a
136/// [`ProcessEventSink`] is installed, emits each appended event to it.
137///
138/// The sink is installed once at wrap time via
139/// [`watch_process_registry_with_sink`]; there is no post-hoc mutation and no
140/// double-wrapping.
141struct WatchedProcessRegistry {
142    inner: Arc<dyn ProcessRegistry>,
143    hub: ProcessChangeHub,
144    sink: Option<Arc<dyn ProcessEventSink>>,
145}
146
147/// Wrap `inner` in a [`WatchedProcessRegistry`] with no event sink.
148///
149/// The decorated handle publishes change ticks to the returned
150/// [`ProcessChangeHub`]. Use [`watch_process_registry_with_sink`] to also feed a
151/// host-facing [`ProcessEventSink`].
152pub fn watch_process_registry(
153    inner: Arc<dyn ProcessRegistry>,
154) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
155    watch_process_registry_with_sink(inner, None)
156}
157
158/// Wrap `inner` in a [`WatchedProcessRegistry`], optionally installing a
159/// [`ProcessEventSink`] that receives every appended event.
160///
161/// The sink is best-effort freshness, not truth — see [`ProcessEventSink`].
162pub fn watch_process_registry_with_sink(
163    inner: Arc<dyn ProcessRegistry>,
164    sink: Option<Arc<dyn ProcessEventSink>>,
165) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
166    let hub = ProcessChangeHub::new();
167    (
168        Arc::new(WatchedProcessRegistry {
169            inner,
170            hub: hub.clone(),
171            sink,
172        }),
173        hub,
174    )
175}
176
177/// Core waiter for process terminal state and events (ADR 0016).
178///
179/// The awaiter is the store-only fallback that
180/// [`ProcessWorkDriver`](crate::ProcessWorkDriver) uses when no engine-native
181/// [`ProcessAttach`] owns the wait. It performs narrow point reads
182/// (`get_process`, `events_after`) and, when constructed with a
183/// [`ProcessChangeHub`], wakes promptly on local mutations instead of polling.
184/// Callers still bound every wait with [`tokio::time::timeout`].
185#[derive(Clone)]
186pub struct ProcessAwaiter {
187    registry: Arc<dyn ProcessRegistry>,
188    hub: Option<ProcessChangeHub>,
189}
190
191impl ProcessAwaiter {
192    /// Hub-backed awaiter: local mutations published to `hub` wake waiters
193    /// without database polling. This is what a [`WatchedProcessRegistry`]
194    /// wrapping provides via [`watch_process_registry`].
195    pub fn new(registry: Arc<dyn ProcessRegistry>, hub: ProcessChangeHub) -> Self {
196        Self {
197            registry,
198            hub: Some(hub),
199        }
200    }
201
202    /// Hubless awaiter: correct without any change signal, using only the
203    /// bounded backoff point-read loop (25ms floor, doubling, 1s cap). Use when
204    /// the registry is not wrapped in-process — e.g. a store-only test.
205    pub fn polling(registry: Arc<dyn ProcessRegistry>) -> Self {
206        Self {
207            registry,
208            hub: None,
209        }
210    }
211
212    /// Resolve once `process_id` is terminal, returning its outcome. See
213    /// [`ProcessWorkDriver::await_terminal`](crate::ProcessWorkDriver::await_terminal)
214    /// for the timeout-bounding contract.
215    pub async fn await_terminal(
216        &self,
217        process_id: &str,
218    ) -> Result<ProcessAwaitOutput, PluginError> {
219        let mut backoff = AWAIT_BACKOFF_MIN;
220        if let Some(hub) = self.hub.as_ref() {
221            let mut rx = hub.subscribe(process_id);
222            loop {
223                if let Some(output) = self.read_terminal(process_id).await? {
224                    return Ok(output);
225                }
226                tokio::select! {
227                    changed = rx.changed() => {
228                        match changed {
229                            Ok(()) => backoff = AWAIT_BACKOFF_MIN,
230                            // Sender dropped (unreachable today given the hub
231                            // GC invariant, but latent): a dead receiver would
232                            // otherwise fire immediately on every loop turn.
233                            // Stop selecting on it and degrade to the
234                            // sleep-only backoff loop below.
235                            Err(_) => break,
236                        }
237                    }
238                    _ = tokio::time::sleep(backoff) => {
239                        backoff = next_backoff(backoff);
240                    }
241                }
242            }
243        }
244        loop {
245            if let Some(output) = self.read_terminal(process_id).await? {
246                return Ok(output);
247            }
248            tokio::time::sleep(backoff).await;
249            backoff = next_backoff(backoff);
250        }
251    }
252
253    /// Resolve with the first `event_type` event on `process_id` past
254    /// `after_sequence`. Historical matches resolve immediately.
255    pub async fn await_event(
256        &self,
257        process_id: &str,
258        event_type: &str,
259        after_sequence: u64,
260    ) -> Result<ProcessEvent, PluginError> {
261        let mut backoff = AWAIT_BACKOFF_MIN;
262        if let Some(hub) = self.hub.as_ref() {
263            let mut rx = hub.subscribe(process_id);
264            loop {
265                if let Some(event) = self
266                    .read_event(process_id, event_type, after_sequence)
267                    .await?
268                {
269                    return Ok(event);
270                }
271                tokio::select! {
272                    changed = rx.changed() => {
273                        match changed {
274                            Ok(()) => backoff = AWAIT_BACKOFF_MIN,
275                            // Sender dropped (unreachable today given the hub
276                            // GC invariant, but latent): a dead receiver would
277                            // otherwise fire immediately on every loop turn.
278                            // Stop selecting on it and degrade to the
279                            // sleep-only backoff loop below.
280                            Err(_) => break,
281                        }
282                    }
283                    _ = tokio::time::sleep(backoff) => {
284                        backoff = next_backoff(backoff);
285                    }
286                }
287            }
288        }
289        loop {
290            if let Some(event) = self
291                .read_event(process_id, event_type, after_sequence)
292                .await?
293            {
294                return Ok(event);
295            }
296            tokio::time::sleep(backoff).await;
297            backoff = next_backoff(backoff);
298        }
299    }
300
301    async fn read_terminal(
302        &self,
303        process_id: &str,
304    ) -> Result<Option<ProcessAwaitOutput>, PluginError> {
305        let record = self
306            .registry
307            .get_process(process_id)
308            .await
309            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
310        Ok(record.status.await_output().cloned())
311    }
312
313    async fn read_event(
314        &self,
315        process_id: &str,
316        event_type: &str,
317        after_sequence: u64,
318    ) -> Result<Option<ProcessEvent>, PluginError> {
319        Ok(self
320            .registry
321            .events_after(process_id, after_sequence)
322            .await?
323            .into_iter()
324            .find(|event| event.event_type == event_type))
325    }
326}
327
328fn next_backoff(current: Duration) -> Duration {
329    current.saturating_mul(2).min(AWAIT_BACKOFF_MAX)
330}
331
332#[async_trait::async_trait]
333pub trait ProcessAttach: Send + Sync {
334    async fn await_terminal(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
335}
336
337#[async_trait::async_trait]
338impl ProcessRegistry for WatchedProcessRegistry {
339    fn durability_tier(&self) -> crate::DurabilityTier {
340        self.inner.durability_tier()
341    }
342
343    async fn register_process(
344        &self,
345        registration: ProcessRegistration,
346    ) -> Result<ProcessRecord, PluginError> {
347        let process_id = registration.id.clone();
348        let record = self.inner.register_process(registration).await?;
349        self.hub.notify(&process_id);
350        Ok(record)
351    }
352
353    async fn set_external_ref(
354        &self,
355        process_id: &str,
356        external_ref: ProcessExternalRef,
357    ) -> Result<ProcessRecord, PluginError> {
358        let record = self
359            .inner
360            .set_external_ref(process_id, external_ref)
361            .await?;
362        self.hub.notify(process_id);
363        Ok(record)
364    }
365
366    async fn grant_handle(
367        &self,
368        session_scope: &SessionScope,
369        process_id: &str,
370        descriptor: ProcessHandleDescriptor,
371    ) -> Result<ProcessHandleGrant, PluginError> {
372        self.inner
373            .grant_handle(session_scope, process_id, descriptor)
374            .await
375    }
376
377    async fn revoke_handle(
378        &self,
379        session_scope: &SessionScope,
380        process_id: &str,
381    ) -> Result<(), PluginError> {
382        self.inner.revoke_handle(session_scope, process_id).await
383    }
384
385    async fn transfer_handle_grants(
386        &self,
387        from_scope: &SessionScope,
388        to_scope: &SessionScope,
389        process_ids: &[String],
390    ) -> Result<(), PluginError> {
391        self.inner
392            .transfer_handle_grants(from_scope, to_scope, process_ids)
393            .await
394    }
395
396    async fn list_handle_grants(
397        &self,
398        session_scope: &SessionScope,
399    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
400        self.inner.list_handle_grants(session_scope).await
401    }
402
403    async fn list_live_handle_grants(
404        &self,
405        session_scope: &SessionScope,
406    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
407        self.inner.list_live_handle_grants(session_scope).await
408    }
409
410    async fn has_handle_grant(
411        &self,
412        session_scope: &SessionScope,
413        process_id: &str,
414    ) -> Result<bool, PluginError> {
415        self.inner.has_handle_grant(session_scope, process_id).await
416    }
417
418    async fn handle_grants_for_process(
419        &self,
420        process_id: &str,
421    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
422        self.inner.handle_grants_for_process(process_id).await
423    }
424
425    async fn delete_session_process_state(
426        &self,
427        session_id: &str,
428    ) -> Result<ProcessSessionDeleteReport, PluginError> {
429        self.inner.delete_session_process_state(session_id).await
430    }
431
432    async fn append_event(
433        &self,
434        process_id: &str,
435        request: ProcessEventAppendRequest,
436    ) -> Result<ProcessEventAppendResult, PluginError> {
437        let result = self.inner.append_event(process_id, request).await?;
438        self.hub.notify(process_id);
439        // Best-effort freshness after the durable append: the write already
440        // committed, so the sink cannot fail it. Terminal appends never reach
441        // here — `complete_process` writes them through the inner registry.
442        if let Some(sink) = self.sink.as_ref() {
443            sink.emit(&result.event).await;
444        }
445        Ok(result)
446    }
447
448    async fn events_after(
449        &self,
450        process_id: &str,
451        after_sequence: u64,
452    ) -> Result<Vec<ProcessEvent>, PluginError> {
453        self.inner.events_after(process_id, after_sequence).await
454    }
455
456    async fn count_events_through(
457        &self,
458        process_id: &str,
459        event_type: &str,
460        up_to_sequence: u64,
461    ) -> Result<u64, PluginError> {
462        self.inner
463            .count_events_through(process_id, event_type, up_to_sequence)
464            .await
465    }
466
467    async fn recent_events(
468        &self,
469        process_id: &str,
470        limit: usize,
471    ) -> Result<Vec<ProcessEvent>, PluginError> {
472        self.inner.recent_events(process_id, limit).await
473    }
474
475    async fn wake_events_after(
476        &self,
477        process_id: &str,
478        after_sequence: u64,
479    ) -> Result<Vec<ProcessEvent>, PluginError> {
480        self.inner
481            .wake_events_after(process_id, after_sequence)
482            .await
483    }
484
485    async fn complete_process(
486        &self,
487        process_id: &str,
488        await_output: ProcessAwaitOutput,
489    ) -> Result<ProcessRecord, PluginError> {
490        let record = self
491            .inner
492            .complete_process(process_id, await_output)
493            .await?;
494        self.hub.notify(process_id);
495        Ok(record)
496    }
497
498    async fn record_first_started(
499        &self,
500        process_id: &str,
501        started: ProcessStarted,
502    ) -> Result<ProcessRecord, PluginError> {
503        let record = self.inner.record_first_started(process_id, started).await?;
504        self.hub.notify(process_id);
505        Ok(record)
506    }
507
508    async fn request_process_abandon(
509        &self,
510        process_id: &str,
511        request: AbandonRequest,
512    ) -> Result<ProcessRecord, PluginError> {
513        let record = self
514            .inner
515            .request_process_abandon(process_id, request)
516            .await?;
517        self.hub.notify(process_id);
518        Ok(record)
519    }
520
521    async fn set_process_wait(
522        &self,
523        process_id: &str,
524        wait: WaitState,
525    ) -> Result<ProcessRecord, PluginError> {
526        let record = self.inner.set_process_wait(process_id, wait).await?;
527        self.hub.notify(process_id);
528        Ok(record)
529    }
530
531    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
532        let record = self.inner.clear_process_wait(process_id).await?;
533        self.hub.notify(process_id);
534        Ok(record)
535    }
536
537    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
538        self.inner.get_process(process_id).await
539    }
540
541    async fn list_processes(
542        &self,
543        filter: &ProcessListFilter,
544    ) -> Result<Vec<ProcessRecord>, PluginError> {
545        self.inner.list_processes(filter).await
546    }
547
548    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
549        self.inner.ack_wake(process_id, sequence).await?;
550        self.hub.notify(process_id);
551        Ok(())
552    }
553
554    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
555        self.inner.list_non_terminal().await
556    }
557
558    async fn claim_process_lease(
559        &self,
560        process_id: &str,
561        owner: &crate::LeaseOwnerIdentity,
562        lease_ttl_ms: u64,
563    ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
564        self.inner
565            .claim_process_lease(process_id, owner, lease_ttl_ms)
566            .await
567    }
568
569    async fn reclaim_process_lease(
570        &self,
571        process_id: &str,
572        owner: &crate::LeaseOwnerIdentity,
573        observed_holder: &ProcessLease,
574        lease_ttl_ms: u64,
575    ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
576        self.inner
577            .reclaim_process_lease(process_id, owner, observed_holder, lease_ttl_ms)
578            .await
579    }
580
581    async fn renew_process_lease(
582        &self,
583        lease: &ProcessLease,
584        lease_ttl_ms: u64,
585    ) -> Result<ProcessLease, PluginError> {
586        self.inner.renew_process_lease(lease, lease_ttl_ms).await
587    }
588
589    async fn get_process_lease(
590        &self,
591        process_id: &str,
592    ) -> Result<Option<ProcessLease>, PluginError> {
593        self.inner.get_process_lease(process_id).await
594    }
595
596    async fn complete_process_lease(
597        &self,
598        completion: &ProcessLeaseCompletion,
599    ) -> Result<(), PluginError> {
600        self.inner.complete_process_lease(completion).await
601    }
602
603    async fn prune_terminal_processes(
604        &self,
605        cutoff_epoch_ms: u64,
606    ) -> Result<ProcessPruneReport, PluginError> {
607        // No hub bump: pruned rows are terminal, so any waiter on them resolved
608        // long ago (terminal state is durable and observed via the await seam).
609        self.inner.prune_terminal_processes(cutoff_epoch_ms).await
610    }
611}
612
613#[cfg(test)]
614mod tests {
615    use std::sync::Arc;
616
617    use super::*;
618    use crate::{
619        ProcessInput, ProcessProvenance, ProcessRegistration, TestLocalProcessRegistry, ToolControl,
620    };
621
622    fn registration(process_id: &str) -> ProcessRegistration {
623        ProcessRegistration::new(
624            process_id,
625            ProcessInput::External {
626                metadata: serde_json::json!({}),
627            },
628            crate::RecoveryDisposition::ExternallyOwned,
629            ProcessProvenance::host(),
630        )
631    }
632
633    fn plain_event_type(name: &str) -> crate::ProcessEventType {
634        crate::ProcessEventType {
635            name: name.to_string(),
636            payload_schema: crate::LashSchema::any(),
637            semantics: crate::ProcessEventSemanticsSpec::default(),
638        }
639    }
640
641    fn registration_with_events(process_id: &str, event_types: &[&str]) -> ProcessRegistration {
642        registration(process_id)
643            .with_extra_event_types(event_types.iter().map(|name| plain_event_type(name)))
644    }
645
646    /// Records `(event_type, sequence)` in emit order for sink assertions.
647    #[derive(Clone, Default)]
648    struct CollectingSink {
649        events: Arc<Mutex<Vec<(String, u64)>>>,
650    }
651
652    impl CollectingSink {
653        fn collected(&self) -> Vec<(String, u64)> {
654            self.events.lock().expect("sink lock").clone()
655        }
656    }
657
658    #[async_trait::async_trait]
659    impl ProcessEventSink for CollectingSink {
660        async fn emit(&self, event: &ProcessEvent) {
661            self.events
662                .lock()
663                .expect("sink lock")
664                .push((event.event_type.clone(), event.sequence));
665        }
666    }
667
668    fn success(value: serde_json::Value) -> ProcessAwaitOutput {
669        ProcessAwaitOutput::Success {
670            value,
671            control: None::<ToolControl>,
672        }
673    }
674
675    /// ADR 0016 pins the awaiter's polling cadence: a 25ms floor, doubling
676    /// backoff, and a 1s cap. Changing any of the three alters every store-only
677    /// deployment's wait economics, so the exact schedule is asserted here.
678    #[test]
679    fn backoff_schedule_has_25ms_floor_doubling_to_1s_cap() {
680        assert_eq!(AWAIT_BACKOFF_MIN, Duration::from_millis(25));
681        assert_eq!(AWAIT_BACKOFF_MAX, Duration::from_secs(1));
682
683        let mut backoff = AWAIT_BACKOFF_MIN;
684        let mut schedule = vec![backoff];
685        while backoff < AWAIT_BACKOFF_MAX {
686            backoff = next_backoff(backoff);
687            schedule.push(backoff);
688        }
689        assert_eq!(
690            schedule,
691            [25, 50, 100, 200, 400, 800, 1000]
692                .into_iter()
693                .map(Duration::from_millis)
694                .collect::<Vec<_>>(),
695            "the backoff doubles from the 25ms floor and saturates at the 1s cap"
696        );
697        assert_eq!(
698            next_backoff(AWAIT_BACKOFF_MAX),
699            AWAIT_BACKOFF_MAX,
700            "the cap is absorbing"
701        );
702    }
703
704    /// ADR 0017: the decorator delegates `prune_terminal_processes` without a
705    /// hub bump — pruned rows are terminal, so their waiters resolved long ago
706    /// and a tick would only wake unrelated subscribers spuriously.
707    #[tokio::test]
708    async fn prune_through_decorator_does_not_bump_the_hub() {
709        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
710        let (registry, hub) = watch_process_registry(raw);
711        registry
712            .register_process(registration("proc-terminal"))
713            .await
714            .expect("register terminal");
715        registry
716            .complete_process("proc-terminal", success(serde_json::json!("done")))
717            .await
718            .expect("complete");
719        registry
720            .register_process(registration("proc-live"))
721            .await
722            .expect("register live");
723
724        // Subscribe after the mutations above so only post-subscription bumps
725        // are observable.
726        let mut terminal_rx = hub.subscribe("proc-terminal");
727        let mut live_rx = hub.subscribe("proc-live");
728        terminal_rx.mark_unchanged();
729        live_rx.mark_unchanged();
730
731        let report = registry
732            .prune_terminal_processes(u64::MAX)
733            .await
734            .expect("prune");
735        assert_eq!(report.pruned_processes, 1, "the terminal process pruned");
736
737        assert!(
738            !terminal_rx.has_changed().expect("terminal sender open"),
739            "prune must not bump the pruned process's hub entry"
740        );
741        assert!(
742            !live_rx.has_changed().expect("live sender open"),
743            "prune must not bump surviving processes' hub entries"
744        );
745    }
746
747    #[tokio::test]
748    async fn hub_subscribe_then_notify_wakes_and_gc_drops_empty_entry() {
749        let hub = ProcessChangeHub::new();
750        let mut rx = hub.subscribe("proc");
751        hub.notify("proc");
752        tokio::time::timeout(Duration::from_millis(100), rx.changed())
753            .await
754            .expect("notify should wake")
755            .expect("sender remains open");
756
757        drop(rx);
758        hub.notify("proc");
759        assert_eq!(hub.tracked_processes(), 0);
760    }
761
762    #[tokio::test]
763    async fn await_event_returns_historical_event_immediately() {
764        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
765        let (registry, hub) = watch_process_registry(raw);
766        registry
767            .register_process(registration("proc"))
768            .await
769            .expect("register");
770        let appended = registry
771            .append_event(
772                "proc",
773                ProcessEventAppendRequest::cancel_requested("proc", Some("stop".to_string())),
774            )
775            .await
776            .expect("append");
777
778        let event = ProcessAwaiter::new(Arc::clone(&registry), hub)
779            .await_event("proc", "process.cancel_requested", 0)
780            .await
781            .expect("await event");
782        assert_eq!(event.sequence, appended.event.sequence);
783    }
784
785    #[tokio::test]
786    async fn await_terminal_unknown_process_errors() {
787        let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
788        let err = ProcessAwaiter::polling(registry)
789            .await_terminal("missing")
790            .await
791            .expect_err("unknown process should error");
792        assert!(err.to_string().contains("unknown process `missing`"));
793    }
794
795    #[tokio::test]
796    async fn polling_awaiter_resolves_via_backoff() {
797        let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
798        registry
799            .register_process(registration("proc"))
800            .await
801            .expect("register");
802        let writer = Arc::clone(&registry);
803        tokio::spawn(async move {
804            tokio::time::sleep(Duration::from_millis(10)).await;
805            writer
806                .complete_process("proc", success(serde_json::json!({ "ok": true })))
807                .await
808                .expect("complete");
809        });
810
811        let output = tokio::time::timeout(
812            Duration::from_secs(1),
813            ProcessAwaiter::polling(registry).await_terminal("proc"),
814        )
815        .await
816        .expect("polling await timeout")
817        .expect("await terminal");
818        assert_eq!(output, success(serde_json::json!({ "ok": true })));
819    }
820
821    #[tokio::test]
822    async fn watched_awaiter_observes_terminal_without_lost_wakeup() {
823        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
824        let (registry, hub) = watch_process_registry(raw);
825        registry
826            .register_process(registration("proc"))
827            .await
828            .expect("register");
829        let awaiter = ProcessAwaiter::new(Arc::clone(&registry), hub);
830        let waiter = tokio::spawn(async move { awaiter.await_terminal("proc").await });
831        registry
832            .complete_process("proc", success(serde_json::json!("done")))
833            .await
834            .expect("complete");
835
836        let output = tokio::time::timeout(Duration::from_millis(200), waiter)
837            .await
838            .expect("watched await timeout")
839            .expect("join")
840            .expect("await terminal");
841        assert_eq!(output, success(serde_json::json!("done")));
842    }
843
844    #[tokio::test]
845    async fn watched_registry_bumps_on_mutations() {
846        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
847        let (registry, hub) = watch_process_registry(raw);
848        let mut rx = hub.subscribe("proc");
849        registry
850            .register_process(registration("proc"))
851            .await
852            .expect("register");
853        tokio::time::timeout(Duration::from_millis(100), rx.changed())
854            .await
855            .expect("register bump")
856            .expect("sender remains open");
857
858        registry
859            .append_event(
860                "proc",
861                ProcessEventAppendRequest::cancel_requested("proc", None),
862            )
863            .await
864            .expect("append");
865        tokio::time::timeout(Duration::from_millis(100), rx.changed())
866            .await
867            .expect("append bump")
868            .expect("sender remains open");
869    }
870
871    #[tokio::test]
872    async fn sink_receives_appended_events_in_order() {
873        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
874        let sink = CollectingSink::default();
875        let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
876        registry
877            .register_process(registration_with_events(
878                "proc",
879                &["producer.a", "producer.b"],
880            ))
881            .await
882            .expect("register");
883        registry
884            .append_event(
885                "proc",
886                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
887            )
888            .await
889            .expect("append a");
890        registry
891            .append_event(
892                "proc",
893                ProcessEventAppendRequest::new("producer.b", serde_json::json!({})),
894            )
895            .await
896            .expect("append b");
897
898        assert_eq!(
899            sink.collected(),
900            vec![("producer.a".to_string(), 1), ("producer.b".to_string(), 2)],
901            "the sink must observe appended events after their write, in append order"
902        );
903    }
904
905    #[tokio::test]
906    async fn sink_absent_leaves_appends_unchanged() {
907        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
908        let (registry, _hub) = watch_process_registry_with_sink(raw, None);
909        registry
910            .register_process(registration_with_events("proc", &["producer.a"]))
911            .await
912            .expect("register");
913        let appended = registry
914            .append_event(
915                "proc",
916                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
917            )
918            .await
919            .expect("append succeeds with no sink installed");
920        assert_eq!(appended.event.sequence, 1);
921    }
922
923    #[tokio::test]
924    async fn sink_not_invoked_for_complete_process_terminal_append() {
925        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
926        let sink = CollectingSink::default();
927        let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
928        registry
929            .register_process(registration_with_events("proc", &["producer.a"]))
930            .await
931            .expect("register");
932        registry
933            .append_event(
934                "proc",
935                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
936            )
937            .await
938            .expect("explicit append");
939        registry
940            .complete_process("proc", success(serde_json::json!("done")))
941            .await
942            .expect("complete");
943
944        assert_eq!(
945            sink.collected(),
946            vec![("producer.a".to_string(), 1)],
947            "complete_process appends its terminal event through the inner registry, so the \
948             decorator never emits it to the sink"
949        );
950    }
951
952    #[tokio::test]
953    async fn sink_present_still_bumps_hub_on_append() {
954        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
955        let sink = CollectingSink::default();
956        let (registry, hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink)));
957        let mut rx = hub.subscribe("proc");
958        registry
959            .register_process(registration_with_events("proc", &["producer.a"]))
960            .await
961            .expect("register");
962        tokio::time::timeout(Duration::from_millis(100), rx.changed())
963            .await
964            .expect("register bump")
965            .expect("sender remains open");
966        registry
967            .append_event(
968                "proc",
969                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
970            )
971            .await
972            .expect("append");
973        tokio::time::timeout(Duration::from_millis(100), rx.changed())
974            .await
975            .expect("append bump with a sink installed")
976            .expect("sender remains open");
977    }
978
979    struct NoopRunHandle;
980
981    #[async_trait::async_trait]
982    impl crate::ProcessRunHandle for NoopRunHandle {
983        async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
984            Ok(())
985        }
986    }
987
988    struct PanicAttach;
989
990    #[async_trait::async_trait]
991    impl ProcessAttach for PanicAttach {
992        async fn await_terminal(
993            &self,
994            _process_id: &str,
995        ) -> Result<ProcessAwaitOutput, PluginError> {
996            panic!("attach should not be called for already-terminal process")
997        }
998    }
999
1000    struct ErrorAttach;
1001
1002    #[async_trait::async_trait]
1003    impl ProcessAttach for ErrorAttach {
1004        async fn await_terminal(
1005            &self,
1006            _process_id: &str,
1007        ) -> Result<ProcessAwaitOutput, PluginError> {
1008            Err(PluginError::Session("attach failed".to_string()))
1009        }
1010    }
1011
1012    #[tokio::test]
1013    async fn driver_short_circuits_terminal_before_attach() {
1014        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1015        let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
1016            .with_attach(Arc::new(PanicAttach));
1017        let registry = driver.process_registry();
1018        registry
1019            .register_process(registration("proc"))
1020            .await
1021            .expect("register");
1022        registry
1023            .complete_process("proc", success(serde_json::json!("ready")))
1024            .await
1025            .expect("complete");
1026
1027        let output = driver.await_terminal("proc").await.expect("await terminal");
1028        assert_eq!(output, success(serde_json::json!("ready")));
1029    }
1030
1031    #[tokio::test]
1032    async fn driver_attach_errors_propagate_without_poll_fallback() {
1033        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1034        let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
1035            .with_attach(Arc::new(ErrorAttach));
1036        driver
1037            .process_registry()
1038            .register_process(registration("proc"))
1039            .await
1040            .expect("register");
1041
1042        let err = driver
1043            .await_terminal("proc")
1044            .await
1045            .expect_err("attach error should propagate");
1046        assert!(err.to_string().contains("attach failed"));
1047    }
1048
1049    struct CountingAttach {
1050        calls: Arc<std::sync::atomic::AtomicUsize>,
1051    }
1052
1053    #[async_trait::async_trait]
1054    impl ProcessAttach for CountingAttach {
1055        async fn await_terminal(
1056            &self,
1057            _process_id: &str,
1058        ) -> Result<ProcessAwaitOutput, PluginError> {
1059            self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1060            Err(PluginError::Session(
1061                "attach must not be consulted for a terminal process".to_string(),
1062            ))
1063        }
1064    }
1065
1066    /// Sim-style race: many waiters attach to one process and completion fires
1067    /// while they are mid-flight between their subscribe and their first read.
1068    /// The change hub must resolve every one with identical output — no lost
1069    /// wakeups, no divergent results (ADR 0016).
1070    #[tokio::test]
1071    async fn concurrent_waiters_all_resolve_with_identical_output_on_completion() {
1072        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1073        let (registry, hub) = watch_process_registry(raw);
1074        registry
1075            .register_process(registration("proc"))
1076            .await
1077            .expect("register");
1078
1079        const WAITERS: usize = 16;
1080        let barrier = Arc::new(tokio::sync::Barrier::new(WAITERS + 1));
1081        let mut waiters = Vec::with_capacity(WAITERS);
1082        for _ in 0..WAITERS {
1083            let awaiter = ProcessAwaiter::new(Arc::clone(&registry), hub.clone());
1084            let barrier = Arc::clone(&barrier);
1085            waiters.push(tokio::spawn(async move {
1086                barrier.wait().await;
1087                awaiter.await_terminal("proc").await
1088            }));
1089        }
1090        // Release every waiter, then complete at once so completion races their
1091        // first read and subscribe.
1092        barrier.wait().await;
1093        let output = success(serde_json::json!({ "raced": true }));
1094        registry
1095            .complete_process("proc", output.clone())
1096            .await
1097            .expect("complete");
1098
1099        for waiter in waiters {
1100            let resolved = tokio::time::timeout(Duration::from_secs(2), waiter)
1101                .await
1102                .expect("each racing waiter resolves under 2s")
1103                .expect("join waiter")
1104                .expect("await terminal");
1105            assert_eq!(
1106                resolved, output,
1107                "every concurrent waiter resolves with identical terminal output"
1108            );
1109        }
1110    }
1111
1112    /// Sim-style restart/re-attach: a process completes while no waiter is
1113    /// attached; a later `await_terminal` resolves instantly through the
1114    /// registry short-circuit and never consults the engine attach (ADR 0016 —
1115    /// the terminal point-read precedes any attach hand-off).
1116    #[tokio::test]
1117    async fn driver_reattach_after_terminal_short_circuits_without_engine_call() {
1118        use std::sync::atomic::Ordering;
1119
1120        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1121        let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1122        let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle)).with_attach(
1123            Arc::new(CountingAttach {
1124                calls: Arc::clone(&calls),
1125            }),
1126        );
1127        let registry = driver.process_registry();
1128        registry
1129            .register_process(registration("proc"))
1130            .await
1131            .expect("register");
1132        // Process reaches terminal with no waiter attached.
1133        let output = success(serde_json::json!("reattached"));
1134        registry
1135            .complete_process("proc", output.clone())
1136            .await
1137            .expect("complete");
1138
1139        // A later await resolves via the registry short-circuit, instantly.
1140        let start = std::time::Instant::now();
1141        let resolved = driver.await_terminal("proc").await.expect("await terminal");
1142        assert_eq!(resolved, output);
1143        assert_eq!(
1144            calls.load(Ordering::SeqCst),
1145            0,
1146            "a terminal short-circuit must never call the engine attach"
1147        );
1148        assert!(
1149            start.elapsed() < Duration::from_millis(500),
1150            "a short-circuit resolves without any backoff wait"
1151        );
1152    }
1153
1154    /// Records seen vs. dropped emit sequences, dropping even sequences to model
1155    /// best-effort push loss.
1156    #[derive(Clone, Default)]
1157    struct LossySink {
1158        seen: Arc<Mutex<Vec<u64>>>,
1159        dropped: Arc<Mutex<Vec<u64>>>,
1160    }
1161
1162    #[async_trait::async_trait]
1163    impl ProcessEventSink for LossySink {
1164        async fn emit(&self, event: &ProcessEvent) {
1165            if event.sequence.is_multiple_of(2) {
1166                self.dropped.lock().expect("sink lock").push(event.sequence);
1167            } else {
1168                self.seen.lock().expect("sink lock").push(event.sequence);
1169            }
1170        }
1171    }
1172
1173    /// Sim-style sink loss: a sink that drops a fraction of emits still leaves
1174    /// the durable log complete. Reconciling from `events_after` at terminal
1175    /// recovers every event the push feed missed — ADR 0017's "push loss never
1176    /// loses truth".
1177    #[tokio::test]
1178    async fn lossy_sink_still_reconciles_complete_log_from_events_after() {
1179        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1180        let sink = LossySink::default();
1181        let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
1182        registry
1183            .register_process(registration_with_events("proc", &["producer.step"]))
1184            .await
1185            .expect("register");
1186
1187        const EVENTS: u64 = 6;
1188        for _ in 0..EVENTS {
1189            registry
1190                .append_event(
1191                    "proc",
1192                    ProcessEventAppendRequest::new("producer.step", serde_json::json!({})),
1193                )
1194                .await
1195                .expect("append");
1196        }
1197        // The terminal event never rides the sink at all (ADR 0017): completion
1198        // observation is the await seam's job.
1199        registry
1200            .complete_process("proc", success(serde_json::json!("done")))
1201            .await
1202            .expect("complete");
1203
1204        // The push feed genuinely lost some events...
1205        assert!(
1206            !sink.dropped.lock().expect("sink lock").is_empty(),
1207            "the lossy sink must drop at least one emit for the scenario to be meaningful"
1208        );
1209        assert!(
1210            (sink.seen.lock().expect("sink lock").len() as u64) < EVENTS,
1211            "the sink observed fewer events than were appended"
1212        );
1213        // ...but the durable log is the complete, ordered truth.
1214        let reconciled = registry
1215            .events_after("proc", 0)
1216            .await
1217            .expect("events")
1218            .into_iter()
1219            .filter(|event| event.event_type == "producer.step")
1220            .map(|event| event.sequence)
1221            .collect::<Vec<_>>();
1222        assert_eq!(
1223            reconciled,
1224            (1..=EVENTS).collect::<Vec<_>>(),
1225            "events_after reconciles the complete non-terminal log despite push loss"
1226        );
1227    }
1228}