Skip to main content

lash_core/runtime/process/
awaiter.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tokio::sync::watch;
6
7use super::events::{
8    ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
9};
10use super::model::{
11    ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry,
12    ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion, ProcessListFilter,
13    ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport, SessionScope, WaitState,
14};
15use super::registry::{ProcessPruneReport, ProcessRegistry};
16use crate::PluginError;
17
18const AWAIT_BACKOFF_MIN: Duration = Duration::from_millis(25);
19const AWAIT_BACKOFF_MAX: Duration = Duration::from_secs(1);
20
21#[derive(Clone, Default)]
22pub struct ProcessChangeHub {
23    inner: Arc<Mutex<HashMap<String, watch::Sender<u64>>>>,
24}
25
26impl ProcessChangeHub {
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Subscribe before reading a process row. The receiver carries only a
32    /// version counter; waiters always re-read the registry after a bump.
33    pub fn subscribe(&self, process_id: &str) -> watch::Receiver<u64> {
34        let mut guard = self.inner.lock().expect("process change hub lock poisoned");
35        guard
36            .entry(process_id.to_string())
37            .or_insert_with(|| {
38                let (tx, _rx) = watch::channel(0);
39                tx
40            })
41            .subscribe()
42    }
43
44    pub fn notify(&self, process_id: &str) {
45        let mut guard = self.inner.lock().expect("process change hub lock poisoned");
46        let mut remove = false;
47        if let Some(tx) = guard.get(process_id) {
48            if tx.receiver_count() == 0 {
49                remove = true;
50            } else {
51                let next = (*tx.borrow()).wrapping_add(1);
52                if tx.send(next).is_err() {
53                    remove = true;
54                }
55            }
56        }
57        if remove {
58            guard.remove(process_id);
59        }
60    }
61
62    #[cfg(test)]
63    fn tracked_processes(&self) -> usize {
64        self.inner
65            .lock()
66            .expect("process change hub lock poisoned")
67            .len()
68    }
69}
70
71/// Host-facing, best-effort push of each appended process event.
72///
73/// A sink is an optional freshness feed, **never a source of truth.** The
74/// durable event log ([`ProcessRegistry::events_after`]) is the only complete
75/// record; a sink lets a host observe appends promptly without polling, but it
76/// makes no delivery promise.
77///
78/// # Contract
79///
80/// - **Best-effort freshness, never truth.** [`WatchedProcessRegistry`] calls
81///   [`emit`](Self::emit) after a successful `append_event`, in that pod's
82///   per-process append order. There is no buffering, no retry, and no
83///   delivery guarantee across pod crashes or restarts: an event that was
84///   appended durably may never reach the sink (e.g. the pod died between the
85///   durable write and the emit). Consumers that need completeness reconcile
86///   from `events_after` — the durable log is authoritative — typically at
87///   terminal time.
88/// - **Terminal events are deliberately NOT emitted through the sink.**
89///   [`ProcessRegistry::complete_process`] appends its terminal event via the
90///   *inner* registry internally, so the decorator never observes it as an
91///   `append_event` and never emits it. Do not wait on the sink for
92///   completion: terminal observation rides
93///   [`ProcessWorkDriver::await_terminal`](crate::ProcessWorkDriver::await_terminal)
94///   (see ADR 0016), which reads the durable terminal state.
95/// - **Emission cannot fail the write.** `emit` returns `()`, so a sink can
96///   never fail or roll back an append; the durable write has already
97///   committed by the time `emit` runs. But the decorator *awaits* `emit`
98///   inline on the append path, so a slow sink slows every append. Implementors
99///   must return fast: hand any real I/O off to a channel or background task
100///   internally rather than blocking inside `emit`.
101#[async_trait::async_trait]
102pub trait ProcessEventSink: Send + Sync {
103    /// Observe one appended process event. Best-effort; see the trait contract.
104    ///
105    /// Must be fast and non-blocking — offload I/O to a channel/task internally.
106    async fn emit(&self, event: &ProcessEvent);
107}
108
109/// [`ProcessRegistry`] decorator: publishes in-process change ticks on every
110/// mutation (so [`ProcessAwaiter`] wakes without polling) and, when a
111/// [`ProcessEventSink`] is installed, emits each appended event to it.
112///
113/// The sink is installed once at wrap time via
114/// [`watch_process_registry_with_sink`]; there is no post-hoc mutation and no
115/// double-wrapping.
116struct WatchedProcessRegistry {
117    inner: Arc<dyn ProcessRegistry>,
118    hub: ProcessChangeHub,
119    sink: Option<Arc<dyn ProcessEventSink>>,
120}
121
122/// Wrap `inner` in a [`WatchedProcessRegistry`] with no event sink.
123///
124/// The decorated handle publishes change ticks to the returned
125/// [`ProcessChangeHub`]. Use [`watch_process_registry_with_sink`] to also feed a
126/// host-facing [`ProcessEventSink`].
127pub fn watch_process_registry(
128    inner: Arc<dyn ProcessRegistry>,
129) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
130    watch_process_registry_with_sink(inner, None)
131}
132
133/// Wrap `inner` in a [`WatchedProcessRegistry`], optionally installing a
134/// [`ProcessEventSink`] that receives every appended event.
135///
136/// The sink is best-effort freshness, not truth — see [`ProcessEventSink`].
137pub fn watch_process_registry_with_sink(
138    inner: Arc<dyn ProcessRegistry>,
139    sink: Option<Arc<dyn ProcessEventSink>>,
140) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
141    let hub = ProcessChangeHub::new();
142    (
143        Arc::new(WatchedProcessRegistry {
144            inner,
145            hub: hub.clone(),
146            sink,
147        }),
148        hub,
149    )
150}
151
152#[derive(Clone)]
153pub struct ProcessAwaiter {
154    registry: Arc<dyn ProcessRegistry>,
155    hub: Option<ProcessChangeHub>,
156}
157
158impl ProcessAwaiter {
159    pub fn new(registry: Arc<dyn ProcessRegistry>, hub: ProcessChangeHub) -> Self {
160        Self {
161            registry,
162            hub: Some(hub),
163        }
164    }
165
166    pub fn polling(registry: Arc<dyn ProcessRegistry>) -> Self {
167        Self {
168            registry,
169            hub: None,
170        }
171    }
172
173    pub async fn await_terminal(
174        &self,
175        process_id: &str,
176    ) -> Result<ProcessAwaitOutput, PluginError> {
177        let mut backoff = AWAIT_BACKOFF_MIN;
178        if let Some(hub) = self.hub.as_ref() {
179            let mut rx = hub.subscribe(process_id);
180            loop {
181                if let Some(output) = self.read_terminal(process_id).await? {
182                    return Ok(output);
183                }
184                tokio::select! {
185                    changed = rx.changed() => {
186                        match changed {
187                            Ok(()) => backoff = AWAIT_BACKOFF_MIN,
188                            // Sender dropped (unreachable today given the hub
189                            // GC invariant, but latent): a dead receiver would
190                            // otherwise fire immediately on every loop turn.
191                            // Stop selecting on it and degrade to the
192                            // sleep-only backoff loop below.
193                            Err(_) => break,
194                        }
195                    }
196                    _ = tokio::time::sleep(backoff) => {
197                        backoff = next_backoff(backoff);
198                    }
199                }
200            }
201        }
202        loop {
203            if let Some(output) = self.read_terminal(process_id).await? {
204                return Ok(output);
205            }
206            tokio::time::sleep(backoff).await;
207            backoff = next_backoff(backoff);
208        }
209    }
210
211    pub async fn await_event(
212        &self,
213        process_id: &str,
214        event_type: &str,
215        after_sequence: u64,
216    ) -> Result<ProcessEvent, PluginError> {
217        let mut backoff = AWAIT_BACKOFF_MIN;
218        if let Some(hub) = self.hub.as_ref() {
219            let mut rx = hub.subscribe(process_id);
220            loop {
221                if let Some(event) = self
222                    .read_event(process_id, event_type, after_sequence)
223                    .await?
224                {
225                    return Ok(event);
226                }
227                tokio::select! {
228                    changed = rx.changed() => {
229                        match changed {
230                            Ok(()) => backoff = AWAIT_BACKOFF_MIN,
231                            // Sender dropped (unreachable today given the hub
232                            // GC invariant, but latent): a dead receiver would
233                            // otherwise fire immediately on every loop turn.
234                            // Stop selecting on it and degrade to the
235                            // sleep-only backoff loop below.
236                            Err(_) => break,
237                        }
238                    }
239                    _ = tokio::time::sleep(backoff) => {
240                        backoff = next_backoff(backoff);
241                    }
242                }
243            }
244        }
245        loop {
246            if let Some(event) = self
247                .read_event(process_id, event_type, after_sequence)
248                .await?
249            {
250                return Ok(event);
251            }
252            tokio::time::sleep(backoff).await;
253            backoff = next_backoff(backoff);
254        }
255    }
256
257    async fn read_terminal(
258        &self,
259        process_id: &str,
260    ) -> Result<Option<ProcessAwaitOutput>, PluginError> {
261        let record = self
262            .registry
263            .get_process(process_id)
264            .await
265            .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
266        Ok(record.status.await_output().cloned())
267    }
268
269    async fn read_event(
270        &self,
271        process_id: &str,
272        event_type: &str,
273        after_sequence: u64,
274    ) -> Result<Option<ProcessEvent>, PluginError> {
275        Ok(self
276            .registry
277            .events_after(process_id, after_sequence)
278            .await?
279            .into_iter()
280            .find(|event| event.event_type == event_type))
281    }
282}
283
284fn next_backoff(current: Duration) -> Duration {
285    current.saturating_mul(2).min(AWAIT_BACKOFF_MAX)
286}
287
288#[async_trait::async_trait]
289pub trait ProcessAttach: Send + Sync {
290    async fn await_terminal(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
291}
292
293#[async_trait::async_trait]
294impl ProcessRegistry for WatchedProcessRegistry {
295    fn durability_tier(&self) -> crate::DurabilityTier {
296        self.inner.durability_tier()
297    }
298
299    async fn register_process(
300        &self,
301        registration: ProcessRegistration,
302    ) -> Result<ProcessRecord, PluginError> {
303        let process_id = registration.id.clone();
304        let record = self.inner.register_process(registration).await?;
305        self.hub.notify(&process_id);
306        Ok(record)
307    }
308
309    async fn set_external_ref(
310        &self,
311        process_id: &str,
312        external_ref: ProcessExternalRef,
313    ) -> Result<ProcessRecord, PluginError> {
314        let record = self
315            .inner
316            .set_external_ref(process_id, external_ref)
317            .await?;
318        self.hub.notify(process_id);
319        Ok(record)
320    }
321
322    async fn grant_handle(
323        &self,
324        session_scope: &SessionScope,
325        process_id: &str,
326        descriptor: ProcessHandleDescriptor,
327    ) -> Result<ProcessHandleGrant, PluginError> {
328        self.inner
329            .grant_handle(session_scope, process_id, descriptor)
330            .await
331    }
332
333    async fn revoke_handle(
334        &self,
335        session_scope: &SessionScope,
336        process_id: &str,
337    ) -> Result<(), PluginError> {
338        self.inner.revoke_handle(session_scope, process_id).await
339    }
340
341    async fn transfer_handle_grants(
342        &self,
343        from_scope: &SessionScope,
344        to_scope: &SessionScope,
345        process_ids: &[String],
346    ) -> Result<(), PluginError> {
347        self.inner
348            .transfer_handle_grants(from_scope, to_scope, process_ids)
349            .await
350    }
351
352    async fn list_handle_grants(
353        &self,
354        session_scope: &SessionScope,
355    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
356        self.inner.list_handle_grants(session_scope).await
357    }
358
359    async fn list_live_handle_grants(
360        &self,
361        session_scope: &SessionScope,
362    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
363        self.inner.list_live_handle_grants(session_scope).await
364    }
365
366    async fn has_handle_grant(
367        &self,
368        session_scope: &SessionScope,
369        process_id: &str,
370    ) -> Result<bool, PluginError> {
371        self.inner.has_handle_grant(session_scope, process_id).await
372    }
373
374    async fn handle_grants_for_process(
375        &self,
376        process_id: &str,
377    ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
378        self.inner.handle_grants_for_process(process_id).await
379    }
380
381    async fn delete_session_process_state(
382        &self,
383        session_id: &str,
384    ) -> Result<ProcessSessionDeleteReport, PluginError> {
385        self.inner.delete_session_process_state(session_id).await
386    }
387
388    async fn append_event(
389        &self,
390        process_id: &str,
391        request: ProcessEventAppendRequest,
392    ) -> Result<ProcessEventAppendResult, PluginError> {
393        let result = self.inner.append_event(process_id, request).await?;
394        self.hub.notify(process_id);
395        // Best-effort freshness after the durable append: the write already
396        // committed, so the sink cannot fail it. Terminal appends never reach
397        // here — `complete_process` writes them through the inner registry.
398        if let Some(sink) = self.sink.as_ref() {
399            sink.emit(&result.event).await;
400        }
401        Ok(result)
402    }
403
404    async fn events_after(
405        &self,
406        process_id: &str,
407        after_sequence: u64,
408    ) -> Result<Vec<ProcessEvent>, PluginError> {
409        self.inner.events_after(process_id, after_sequence).await
410    }
411
412    async fn count_events_through(
413        &self,
414        process_id: &str,
415        event_type: &str,
416        up_to_sequence: u64,
417    ) -> Result<u64, PluginError> {
418        self.inner
419            .count_events_through(process_id, event_type, up_to_sequence)
420            .await
421    }
422
423    async fn recent_events(
424        &self,
425        process_id: &str,
426        limit: usize,
427    ) -> Result<Vec<ProcessEvent>, PluginError> {
428        self.inner.recent_events(process_id, limit).await
429    }
430
431    async fn wake_events_after(
432        &self,
433        process_id: &str,
434        after_sequence: u64,
435    ) -> Result<Vec<ProcessEvent>, PluginError> {
436        self.inner
437            .wake_events_after(process_id, after_sequence)
438            .await
439    }
440
441    async fn complete_process(
442        &self,
443        process_id: &str,
444        await_output: ProcessAwaitOutput,
445    ) -> Result<ProcessRecord, PluginError> {
446        let record = self
447            .inner
448            .complete_process(process_id, await_output)
449            .await?;
450        self.hub.notify(process_id);
451        Ok(record)
452    }
453
454    async fn set_process_wait(
455        &self,
456        process_id: &str,
457        wait: WaitState,
458    ) -> Result<ProcessRecord, PluginError> {
459        let record = self.inner.set_process_wait(process_id, wait).await?;
460        self.hub.notify(process_id);
461        Ok(record)
462    }
463
464    async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
465        let record = self.inner.clear_process_wait(process_id).await?;
466        self.hub.notify(process_id);
467        Ok(record)
468    }
469
470    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
471        self.inner.get_process(process_id).await
472    }
473
474    async fn list_processes(
475        &self,
476        filter: &ProcessListFilter,
477    ) -> Result<Vec<ProcessRecord>, PluginError> {
478        self.inner.list_processes(filter).await
479    }
480
481    async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
482        self.inner.ack_wake(process_id, sequence).await?;
483        self.hub.notify(process_id);
484        Ok(())
485    }
486
487    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
488        self.inner.list_non_terminal().await
489    }
490
491    async fn claim_process_lease(
492        &self,
493        process_id: &str,
494        owner: &crate::LeaseOwnerIdentity,
495        lease_ttl_ms: u64,
496    ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
497        self.inner
498            .claim_process_lease(process_id, owner, lease_ttl_ms)
499            .await
500    }
501
502    async fn reclaim_process_lease(
503        &self,
504        process_id: &str,
505        owner: &crate::LeaseOwnerIdentity,
506        observed_holder: &ProcessLease,
507        lease_ttl_ms: u64,
508    ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
509        self.inner
510            .reclaim_process_lease(process_id, owner, observed_holder, lease_ttl_ms)
511            .await
512    }
513
514    async fn renew_process_lease(
515        &self,
516        lease: &ProcessLease,
517        lease_ttl_ms: u64,
518    ) -> Result<ProcessLease, PluginError> {
519        self.inner.renew_process_lease(lease, lease_ttl_ms).await
520    }
521
522    async fn complete_process_lease(
523        &self,
524        completion: &ProcessLeaseCompletion,
525    ) -> Result<(), PluginError> {
526        self.inner.complete_process_lease(completion).await
527    }
528
529    async fn prune_terminal_processes(
530        &self,
531        cutoff_epoch_ms: u64,
532    ) -> Result<ProcessPruneReport, PluginError> {
533        // No hub bump: pruned rows are terminal, so any waiter on them resolved
534        // long ago (terminal state is durable and observed via the await seam).
535        self.inner.prune_terminal_processes(cutoff_epoch_ms).await
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use std::sync::Arc;
542
543    use super::*;
544    use crate::{
545        ProcessInput, ProcessProvenance, ProcessRegistration, TestLocalProcessRegistry, ToolControl,
546    };
547
548    fn registration(process_id: &str) -> ProcessRegistration {
549        ProcessRegistration::new(
550            process_id,
551            ProcessInput::External {
552                metadata: serde_json::json!({}),
553            },
554            ProcessProvenance::host(),
555        )
556    }
557
558    fn plain_event_type(name: &str) -> crate::ProcessEventType {
559        crate::ProcessEventType {
560            name: name.to_string(),
561            payload_schema: crate::LashSchema::any(),
562            semantics: crate::ProcessEventSemanticsSpec::default(),
563        }
564    }
565
566    fn registration_with_events(process_id: &str, event_types: &[&str]) -> ProcessRegistration {
567        registration(process_id)
568            .with_extra_event_types(event_types.iter().map(|name| plain_event_type(name)))
569    }
570
571    /// Records `(event_type, sequence)` in emit order for sink assertions.
572    #[derive(Clone, Default)]
573    struct CollectingSink {
574        events: Arc<Mutex<Vec<(String, u64)>>>,
575    }
576
577    impl CollectingSink {
578        fn collected(&self) -> Vec<(String, u64)> {
579            self.events.lock().expect("sink lock").clone()
580        }
581    }
582
583    #[async_trait::async_trait]
584    impl ProcessEventSink for CollectingSink {
585        async fn emit(&self, event: &ProcessEvent) {
586            self.events
587                .lock()
588                .expect("sink lock")
589                .push((event.event_type.clone(), event.sequence));
590        }
591    }
592
593    fn success(value: serde_json::Value) -> ProcessAwaitOutput {
594        ProcessAwaitOutput::Success {
595            value,
596            control: None::<ToolControl>,
597        }
598    }
599
600    #[tokio::test]
601    async fn hub_subscribe_then_notify_wakes_and_gc_drops_empty_entry() {
602        let hub = ProcessChangeHub::new();
603        let mut rx = hub.subscribe("proc");
604        hub.notify("proc");
605        tokio::time::timeout(Duration::from_millis(100), rx.changed())
606            .await
607            .expect("notify should wake")
608            .expect("sender remains open");
609
610        drop(rx);
611        hub.notify("proc");
612        assert_eq!(hub.tracked_processes(), 0);
613    }
614
615    #[tokio::test]
616    async fn await_event_returns_historical_event_immediately() {
617        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
618        let (registry, hub) = watch_process_registry(raw);
619        registry
620            .register_process(registration("proc"))
621            .await
622            .expect("register");
623        let appended = registry
624            .append_event(
625                "proc",
626                ProcessEventAppendRequest::cancel_requested("proc", Some("stop".to_string())),
627            )
628            .await
629            .expect("append");
630
631        let event = ProcessAwaiter::new(Arc::clone(&registry), hub)
632            .await_event("proc", "process.cancel_requested", 0)
633            .await
634            .expect("await event");
635        assert_eq!(event.sequence, appended.event.sequence);
636    }
637
638    #[tokio::test]
639    async fn await_terminal_unknown_process_errors() {
640        let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
641        let err = ProcessAwaiter::polling(registry)
642            .await_terminal("missing")
643            .await
644            .expect_err("unknown process should error");
645        assert!(err.to_string().contains("unknown process `missing`"));
646    }
647
648    #[tokio::test]
649    async fn polling_awaiter_resolves_via_backoff() {
650        let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
651        registry
652            .register_process(registration("proc"))
653            .await
654            .expect("register");
655        let writer = Arc::clone(&registry);
656        tokio::spawn(async move {
657            tokio::time::sleep(Duration::from_millis(10)).await;
658            writer
659                .complete_process("proc", success(serde_json::json!({ "ok": true })))
660                .await
661                .expect("complete");
662        });
663
664        let output = tokio::time::timeout(
665            Duration::from_secs(1),
666            ProcessAwaiter::polling(registry).await_terminal("proc"),
667        )
668        .await
669        .expect("polling await timeout")
670        .expect("await terminal");
671        assert_eq!(output, success(serde_json::json!({ "ok": true })));
672    }
673
674    #[tokio::test]
675    async fn watched_awaiter_observes_terminal_without_lost_wakeup() {
676        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
677        let (registry, hub) = watch_process_registry(raw);
678        registry
679            .register_process(registration("proc"))
680            .await
681            .expect("register");
682        let awaiter = ProcessAwaiter::new(Arc::clone(&registry), hub);
683        let waiter = tokio::spawn(async move { awaiter.await_terminal("proc").await });
684        registry
685            .complete_process("proc", success(serde_json::json!("done")))
686            .await
687            .expect("complete");
688
689        let output = tokio::time::timeout(Duration::from_millis(200), waiter)
690            .await
691            .expect("watched await timeout")
692            .expect("join")
693            .expect("await terminal");
694        assert_eq!(output, success(serde_json::json!("done")));
695    }
696
697    #[tokio::test]
698    async fn watched_registry_bumps_on_mutations() {
699        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
700        let (registry, hub) = watch_process_registry(raw);
701        let mut rx = hub.subscribe("proc");
702        registry
703            .register_process(registration("proc"))
704            .await
705            .expect("register");
706        tokio::time::timeout(Duration::from_millis(100), rx.changed())
707            .await
708            .expect("register bump")
709            .expect("sender remains open");
710
711        registry
712            .append_event(
713                "proc",
714                ProcessEventAppendRequest::cancel_requested("proc", None),
715            )
716            .await
717            .expect("append");
718        tokio::time::timeout(Duration::from_millis(100), rx.changed())
719            .await
720            .expect("append bump")
721            .expect("sender remains open");
722    }
723
724    #[tokio::test]
725    async fn sink_receives_appended_events_in_order() {
726        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
727        let sink = CollectingSink::default();
728        let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
729        registry
730            .register_process(registration_with_events(
731                "proc",
732                &["producer.a", "producer.b"],
733            ))
734            .await
735            .expect("register");
736        registry
737            .append_event(
738                "proc",
739                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
740            )
741            .await
742            .expect("append a");
743        registry
744            .append_event(
745                "proc",
746                ProcessEventAppendRequest::new("producer.b", serde_json::json!({})),
747            )
748            .await
749            .expect("append b");
750
751        assert_eq!(
752            sink.collected(),
753            vec![("producer.a".to_string(), 1), ("producer.b".to_string(), 2)],
754            "the sink must observe appended events after their write, in append order"
755        );
756    }
757
758    #[tokio::test]
759    async fn sink_absent_leaves_appends_unchanged() {
760        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
761        let (registry, _hub) = watch_process_registry_with_sink(raw, None);
762        registry
763            .register_process(registration_with_events("proc", &["producer.a"]))
764            .await
765            .expect("register");
766        let appended = registry
767            .append_event(
768                "proc",
769                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
770            )
771            .await
772            .expect("append succeeds with no sink installed");
773        assert_eq!(appended.event.sequence, 1);
774    }
775
776    #[tokio::test]
777    async fn sink_not_invoked_for_complete_process_terminal_append() {
778        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
779        let sink = CollectingSink::default();
780        let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
781        registry
782            .register_process(registration_with_events("proc", &["producer.a"]))
783            .await
784            .expect("register");
785        registry
786            .append_event(
787                "proc",
788                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
789            )
790            .await
791            .expect("explicit append");
792        registry
793            .complete_process("proc", success(serde_json::json!("done")))
794            .await
795            .expect("complete");
796
797        assert_eq!(
798            sink.collected(),
799            vec![("producer.a".to_string(), 1)],
800            "complete_process appends its terminal event through the inner registry, so the \
801             decorator never emits it to the sink"
802        );
803    }
804
805    #[tokio::test]
806    async fn sink_present_still_bumps_hub_on_append() {
807        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
808        let sink = CollectingSink::default();
809        let (registry, hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink)));
810        let mut rx = hub.subscribe("proc");
811        registry
812            .register_process(registration_with_events("proc", &["producer.a"]))
813            .await
814            .expect("register");
815        tokio::time::timeout(Duration::from_millis(100), rx.changed())
816            .await
817            .expect("register bump")
818            .expect("sender remains open");
819        registry
820            .append_event(
821                "proc",
822                ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
823            )
824            .await
825            .expect("append");
826        tokio::time::timeout(Duration::from_millis(100), rx.changed())
827            .await
828            .expect("append bump with a sink installed")
829            .expect("sender remains open");
830    }
831
832    struct NoopRunHandle;
833
834    #[async_trait::async_trait]
835    impl crate::ProcessRunHandle for NoopRunHandle {
836        async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
837            Ok(())
838        }
839    }
840
841    struct PanicAttach;
842
843    #[async_trait::async_trait]
844    impl ProcessAttach for PanicAttach {
845        async fn await_terminal(
846            &self,
847            _process_id: &str,
848        ) -> Result<ProcessAwaitOutput, PluginError> {
849            panic!("attach should not be called for already-terminal process")
850        }
851    }
852
853    struct ErrorAttach;
854
855    #[async_trait::async_trait]
856    impl ProcessAttach for ErrorAttach {
857        async fn await_terminal(
858            &self,
859            _process_id: &str,
860        ) -> Result<ProcessAwaitOutput, PluginError> {
861            Err(PluginError::Session("attach failed".to_string()))
862        }
863    }
864
865    #[tokio::test]
866    async fn driver_short_circuits_terminal_before_attach() {
867        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
868        let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
869            .with_attach(Arc::new(PanicAttach));
870        let registry = driver.process_registry();
871        registry
872            .register_process(registration("proc"))
873            .await
874            .expect("register");
875        registry
876            .complete_process("proc", success(serde_json::json!("ready")))
877            .await
878            .expect("complete");
879
880        let output = driver.await_terminal("proc").await.expect("await terminal");
881        assert_eq!(output, success(serde_json::json!("ready")));
882    }
883
884    #[tokio::test]
885    async fn driver_attach_errors_propagate_without_poll_fallback() {
886        let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
887        let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
888            .with_attach(Arc::new(ErrorAttach));
889        driver
890            .process_registry()
891            .register_process(registration("proc"))
892            .await
893            .expect("register");
894
895        let err = driver
896            .await_terminal("proc")
897            .await
898            .expect_err("attach error should propagate");
899        assert!(err.to_string().contains("attach failed"));
900    }
901}