1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tokio::sync::watch;
6
7use super::events::{
8 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
9};
10use super::model::{
11 AbandonRequest, ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant,
12 ProcessHandleGrantEntry, ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion,
13 ProcessListFilter, ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport,
14 ProcessStarted, SessionScope, WaitState,
15};
16use super::registry::{ProcessPruneReport, ProcessRegistry};
17use crate::PluginError;
18
19const AWAIT_BACKOFF_MIN: Duration = Duration::from_millis(25);
20const AWAIT_BACKOFF_MAX: Duration = Duration::from_secs(1);
21
22#[derive(Clone, Default)]
23pub struct ProcessChangeHub {
24 inner: Arc<Mutex<HashMap<String, watch::Sender<u64>>>>,
25}
26
27impl ProcessChangeHub {
28 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn subscribe(&self, process_id: &str) -> watch::Receiver<u64> {
35 let mut guard = self.inner.lock().expect("process change hub lock poisoned");
36 guard
37 .entry(process_id.to_string())
38 .or_insert_with(|| {
39 let (tx, _rx) = watch::channel(0);
40 tx
41 })
42 .subscribe()
43 }
44
45 pub fn notify(&self, process_id: &str) {
46 let mut guard = self.inner.lock().expect("process change hub lock poisoned");
47 let mut remove = false;
48 if let Some(tx) = guard.get(process_id) {
49 if tx.receiver_count() == 0 {
50 remove = true;
51 } else {
52 let next = (*tx.borrow()).wrapping_add(1);
53 if tx.send(next).is_err() {
54 remove = true;
55 }
56 }
57 }
58 if remove {
59 guard.remove(process_id);
60 }
61 }
62
63 #[cfg(test)]
64 fn tracked_processes(&self) -> usize {
65 self.inner
66 .lock()
67 .expect("process change hub lock poisoned")
68 .len()
69 }
70}
71
72#[async_trait::async_trait]
127pub trait ProcessEventSink: Send + Sync {
128 async fn emit(&self, event: &ProcessEvent);
132}
133
134struct WatchedProcessRegistry {
142 inner: Arc<dyn ProcessRegistry>,
143 hub: ProcessChangeHub,
144 sink: Option<Arc<dyn ProcessEventSink>>,
145}
146
147pub fn watch_process_registry(
153 inner: Arc<dyn ProcessRegistry>,
154) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
155 watch_process_registry_with_sink(inner, None)
156}
157
158pub fn watch_process_registry_with_sink(
163 inner: Arc<dyn ProcessRegistry>,
164 sink: Option<Arc<dyn ProcessEventSink>>,
165) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
166 let hub = ProcessChangeHub::new();
167 (
168 Arc::new(WatchedProcessRegistry {
169 inner,
170 hub: hub.clone(),
171 sink,
172 }),
173 hub,
174 )
175}
176
177#[derive(Clone)]
186pub struct ProcessAwaiter {
187 registry: Arc<dyn ProcessRegistry>,
188 hub: Option<ProcessChangeHub>,
189}
190
191impl ProcessAwaiter {
192 pub fn new(registry: Arc<dyn ProcessRegistry>, hub: ProcessChangeHub) -> Self {
196 Self {
197 registry,
198 hub: Some(hub),
199 }
200 }
201
202 pub fn polling(registry: Arc<dyn ProcessRegistry>) -> Self {
206 Self {
207 registry,
208 hub: None,
209 }
210 }
211
212 pub async fn await_terminal(
216 &self,
217 process_id: &str,
218 ) -> Result<ProcessAwaitOutput, PluginError> {
219 let mut backoff = AWAIT_BACKOFF_MIN;
220 if let Some(hub) = self.hub.as_ref() {
221 let mut rx = hub.subscribe(process_id);
222 loop {
223 if let Some(output) = self.read_terminal(process_id).await? {
224 return Ok(output);
225 }
226 tokio::select! {
227 changed = rx.changed() => {
228 match changed {
229 Ok(()) => backoff = AWAIT_BACKOFF_MIN,
230 Err(_) => break,
236 }
237 }
238 _ = tokio::time::sleep(backoff) => {
239 backoff = next_backoff(backoff);
240 }
241 }
242 }
243 }
244 loop {
245 if let Some(output) = self.read_terminal(process_id).await? {
246 return Ok(output);
247 }
248 tokio::time::sleep(backoff).await;
249 backoff = next_backoff(backoff);
250 }
251 }
252
253 pub async fn await_event(
256 &self,
257 process_id: &str,
258 event_type: &str,
259 after_sequence: u64,
260 ) -> Result<ProcessEvent, PluginError> {
261 let mut backoff = AWAIT_BACKOFF_MIN;
262 if let Some(hub) = self.hub.as_ref() {
263 let mut rx = hub.subscribe(process_id);
264 loop {
265 if let Some(event) = self
266 .read_event(process_id, event_type, after_sequence)
267 .await?
268 {
269 return Ok(event);
270 }
271 tokio::select! {
272 changed = rx.changed() => {
273 match changed {
274 Ok(()) => backoff = AWAIT_BACKOFF_MIN,
275 Err(_) => break,
281 }
282 }
283 _ = tokio::time::sleep(backoff) => {
284 backoff = next_backoff(backoff);
285 }
286 }
287 }
288 }
289 loop {
290 if let Some(event) = self
291 .read_event(process_id, event_type, after_sequence)
292 .await?
293 {
294 return Ok(event);
295 }
296 tokio::time::sleep(backoff).await;
297 backoff = next_backoff(backoff);
298 }
299 }
300
301 async fn read_terminal(
302 &self,
303 process_id: &str,
304 ) -> Result<Option<ProcessAwaitOutput>, PluginError> {
305 let record = self
306 .registry
307 .get_process(process_id)
308 .await
309 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
310 Ok(record.status.await_output().cloned())
311 }
312
313 async fn read_event(
314 &self,
315 process_id: &str,
316 event_type: &str,
317 after_sequence: u64,
318 ) -> Result<Option<ProcessEvent>, PluginError> {
319 Ok(self
320 .registry
321 .events_after(process_id, after_sequence)
322 .await?
323 .into_iter()
324 .find(|event| event.event_type == event_type))
325 }
326}
327
328fn next_backoff(current: Duration) -> Duration {
329 current.saturating_mul(2).min(AWAIT_BACKOFF_MAX)
330}
331
332#[async_trait::async_trait]
333pub trait ProcessAttach: Send + Sync {
334 async fn await_terminal(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
335}
336
337#[async_trait::async_trait]
338impl ProcessRegistry for WatchedProcessRegistry {
339 fn durability_tier(&self) -> crate::DurabilityTier {
340 self.inner.durability_tier()
341 }
342
343 async fn register_process(
344 &self,
345 registration: ProcessRegistration,
346 ) -> Result<ProcessRecord, PluginError> {
347 let process_id = registration.id.clone();
348 let record = self.inner.register_process(registration).await?;
349 self.hub.notify(&process_id);
350 Ok(record)
351 }
352
353 async fn set_external_ref(
354 &self,
355 process_id: &str,
356 external_ref: ProcessExternalRef,
357 ) -> Result<ProcessRecord, PluginError> {
358 let record = self
359 .inner
360 .set_external_ref(process_id, external_ref)
361 .await?;
362 self.hub.notify(process_id);
363 Ok(record)
364 }
365
366 async fn grant_handle(
367 &self,
368 session_scope: &SessionScope,
369 process_id: &str,
370 descriptor: ProcessHandleDescriptor,
371 ) -> Result<ProcessHandleGrant, PluginError> {
372 self.inner
373 .grant_handle(session_scope, process_id, descriptor)
374 .await
375 }
376
377 async fn revoke_handle(
378 &self,
379 session_scope: &SessionScope,
380 process_id: &str,
381 ) -> Result<(), PluginError> {
382 self.inner.revoke_handle(session_scope, process_id).await
383 }
384
385 async fn transfer_handle_grants(
386 &self,
387 from_scope: &SessionScope,
388 to_scope: &SessionScope,
389 process_ids: &[String],
390 ) -> Result<(), PluginError> {
391 self.inner
392 .transfer_handle_grants(from_scope, to_scope, process_ids)
393 .await
394 }
395
396 async fn list_handle_grants(
397 &self,
398 session_scope: &SessionScope,
399 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
400 self.inner.list_handle_grants(session_scope).await
401 }
402
403 async fn list_live_handle_grants(
404 &self,
405 session_scope: &SessionScope,
406 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
407 self.inner.list_live_handle_grants(session_scope).await
408 }
409
410 async fn has_handle_grant(
411 &self,
412 session_scope: &SessionScope,
413 process_id: &str,
414 ) -> Result<bool, PluginError> {
415 self.inner.has_handle_grant(session_scope, process_id).await
416 }
417
418 async fn handle_grants_for_process(
419 &self,
420 process_id: &str,
421 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
422 self.inner.handle_grants_for_process(process_id).await
423 }
424
425 async fn delete_session_process_state(
426 &self,
427 session_id: &str,
428 ) -> Result<ProcessSessionDeleteReport, PluginError> {
429 self.inner.delete_session_process_state(session_id).await
430 }
431
432 async fn append_event(
433 &self,
434 process_id: &str,
435 request: ProcessEventAppendRequest,
436 ) -> Result<ProcessEventAppendResult, PluginError> {
437 let result = self.inner.append_event(process_id, request).await?;
438 self.hub.notify(process_id);
439 if let Some(sink) = self.sink.as_ref() {
443 sink.emit(&result.event).await;
444 }
445 Ok(result)
446 }
447
448 async fn events_after(
449 &self,
450 process_id: &str,
451 after_sequence: u64,
452 ) -> Result<Vec<ProcessEvent>, PluginError> {
453 self.inner.events_after(process_id, after_sequence).await
454 }
455
456 async fn count_events_through(
457 &self,
458 process_id: &str,
459 event_type: &str,
460 up_to_sequence: u64,
461 ) -> Result<u64, PluginError> {
462 self.inner
463 .count_events_through(process_id, event_type, up_to_sequence)
464 .await
465 }
466
467 async fn recent_events(
468 &self,
469 process_id: &str,
470 limit: usize,
471 ) -> Result<Vec<ProcessEvent>, PluginError> {
472 self.inner.recent_events(process_id, limit).await
473 }
474
475 async fn wake_events_after(
476 &self,
477 process_id: &str,
478 after_sequence: u64,
479 ) -> Result<Vec<ProcessEvent>, PluginError> {
480 self.inner
481 .wake_events_after(process_id, after_sequence)
482 .await
483 }
484
485 async fn complete_process(
486 &self,
487 process_id: &str,
488 await_output: ProcessAwaitOutput,
489 ) -> Result<ProcessRecord, PluginError> {
490 let record = self
491 .inner
492 .complete_process(process_id, await_output)
493 .await?;
494 self.hub.notify(process_id);
495 Ok(record)
496 }
497
498 async fn record_first_started(
499 &self,
500 process_id: &str,
501 started: ProcessStarted,
502 ) -> Result<ProcessRecord, PluginError> {
503 let record = self.inner.record_first_started(process_id, started).await?;
504 self.hub.notify(process_id);
505 Ok(record)
506 }
507
508 async fn request_process_abandon(
509 &self,
510 process_id: &str,
511 request: AbandonRequest,
512 ) -> Result<ProcessRecord, PluginError> {
513 let record = self
514 .inner
515 .request_process_abandon(process_id, request)
516 .await?;
517 self.hub.notify(process_id);
518 Ok(record)
519 }
520
521 async fn set_process_wait(
522 &self,
523 process_id: &str,
524 wait: WaitState,
525 ) -> Result<ProcessRecord, PluginError> {
526 let record = self.inner.set_process_wait(process_id, wait).await?;
527 self.hub.notify(process_id);
528 Ok(record)
529 }
530
531 async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
532 let record = self.inner.clear_process_wait(process_id).await?;
533 self.hub.notify(process_id);
534 Ok(record)
535 }
536
537 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
538 self.inner.get_process(process_id).await
539 }
540
541 async fn list_processes(
542 &self,
543 filter: &ProcessListFilter,
544 ) -> Result<Vec<ProcessRecord>, PluginError> {
545 self.inner.list_processes(filter).await
546 }
547
548 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
549 self.inner.ack_wake(process_id, sequence).await?;
550 self.hub.notify(process_id);
551 Ok(())
552 }
553
554 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
555 self.inner.list_non_terminal().await
556 }
557
558 async fn claim_process_lease(
559 &self,
560 process_id: &str,
561 owner: &crate::LeaseOwnerIdentity,
562 lease_ttl_ms: u64,
563 ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
564 self.inner
565 .claim_process_lease(process_id, owner, lease_ttl_ms)
566 .await
567 }
568
569 async fn reclaim_process_lease(
570 &self,
571 process_id: &str,
572 owner: &crate::LeaseOwnerIdentity,
573 observed_holder: &ProcessLease,
574 lease_ttl_ms: u64,
575 ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
576 self.inner
577 .reclaim_process_lease(process_id, owner, observed_holder, lease_ttl_ms)
578 .await
579 }
580
581 async fn renew_process_lease(
582 &self,
583 lease: &ProcessLease,
584 lease_ttl_ms: u64,
585 ) -> Result<ProcessLease, PluginError> {
586 self.inner.renew_process_lease(lease, lease_ttl_ms).await
587 }
588
589 async fn get_process_lease(
590 &self,
591 process_id: &str,
592 ) -> Result<Option<ProcessLease>, PluginError> {
593 self.inner.get_process_lease(process_id).await
594 }
595
596 async fn complete_process_lease(
597 &self,
598 completion: &ProcessLeaseCompletion,
599 ) -> Result<(), PluginError> {
600 self.inner.complete_process_lease(completion).await
601 }
602
603 async fn prune_terminal_processes(
604 &self,
605 cutoff_epoch_ms: u64,
606 ) -> Result<ProcessPruneReport, PluginError> {
607 self.inner.prune_terminal_processes(cutoff_epoch_ms).await
610 }
611}
612
613#[cfg(test)]
614mod tests {
615 use std::sync::Arc;
616
617 use super::*;
618 use crate::{
619 ProcessInput, ProcessProvenance, ProcessRegistration, TestLocalProcessRegistry, ToolControl,
620 };
621
622 fn registration(process_id: &str) -> ProcessRegistration {
623 ProcessRegistration::new(
624 process_id,
625 ProcessInput::External {
626 metadata: serde_json::json!({}),
627 },
628 crate::RecoveryDisposition::ExternallyOwned,
629 ProcessProvenance::host(),
630 )
631 }
632
633 fn plain_event_type(name: &str) -> crate::ProcessEventType {
634 crate::ProcessEventType {
635 name: name.to_string(),
636 payload_schema: crate::LashSchema::any(),
637 semantics: crate::ProcessEventSemanticsSpec::default(),
638 }
639 }
640
641 fn registration_with_events(process_id: &str, event_types: &[&str]) -> ProcessRegistration {
642 registration(process_id)
643 .with_extra_event_types(event_types.iter().map(|name| plain_event_type(name)))
644 }
645
646 #[derive(Clone, Default)]
648 struct CollectingSink {
649 events: Arc<Mutex<Vec<(String, u64)>>>,
650 }
651
652 impl CollectingSink {
653 fn collected(&self) -> Vec<(String, u64)> {
654 self.events.lock().expect("sink lock").clone()
655 }
656 }
657
658 #[async_trait::async_trait]
659 impl ProcessEventSink for CollectingSink {
660 async fn emit(&self, event: &ProcessEvent) {
661 self.events
662 .lock()
663 .expect("sink lock")
664 .push((event.event_type.clone(), event.sequence));
665 }
666 }
667
668 fn success(value: serde_json::Value) -> ProcessAwaitOutput {
669 ProcessAwaitOutput::Success {
670 value,
671 control: None::<ToolControl>,
672 }
673 }
674
675 #[test]
679 fn backoff_schedule_has_25ms_floor_doubling_to_1s_cap() {
680 assert_eq!(AWAIT_BACKOFF_MIN, Duration::from_millis(25));
681 assert_eq!(AWAIT_BACKOFF_MAX, Duration::from_secs(1));
682
683 let mut backoff = AWAIT_BACKOFF_MIN;
684 let mut schedule = vec![backoff];
685 while backoff < AWAIT_BACKOFF_MAX {
686 backoff = next_backoff(backoff);
687 schedule.push(backoff);
688 }
689 assert_eq!(
690 schedule,
691 [25, 50, 100, 200, 400, 800, 1000]
692 .into_iter()
693 .map(Duration::from_millis)
694 .collect::<Vec<_>>(),
695 "the backoff doubles from the 25ms floor and saturates at the 1s cap"
696 );
697 assert_eq!(
698 next_backoff(AWAIT_BACKOFF_MAX),
699 AWAIT_BACKOFF_MAX,
700 "the cap is absorbing"
701 );
702 }
703
704 #[tokio::test]
708 async fn prune_through_decorator_does_not_bump_the_hub() {
709 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
710 let (registry, hub) = watch_process_registry(raw);
711 registry
712 .register_process(registration("proc-terminal"))
713 .await
714 .expect("register terminal");
715 registry
716 .complete_process("proc-terminal", success(serde_json::json!("done")))
717 .await
718 .expect("complete");
719 registry
720 .register_process(registration("proc-live"))
721 .await
722 .expect("register live");
723
724 let mut terminal_rx = hub.subscribe("proc-terminal");
727 let mut live_rx = hub.subscribe("proc-live");
728 terminal_rx.mark_unchanged();
729 live_rx.mark_unchanged();
730
731 let report = registry
732 .prune_terminal_processes(u64::MAX)
733 .await
734 .expect("prune");
735 assert_eq!(report.pruned_processes, 1, "the terminal process pruned");
736
737 assert!(
738 !terminal_rx.has_changed().expect("terminal sender open"),
739 "prune must not bump the pruned process's hub entry"
740 );
741 assert!(
742 !live_rx.has_changed().expect("live sender open"),
743 "prune must not bump surviving processes' hub entries"
744 );
745 }
746
747 #[tokio::test]
748 async fn hub_subscribe_then_notify_wakes_and_gc_drops_empty_entry() {
749 let hub = ProcessChangeHub::new();
750 let mut rx = hub.subscribe("proc");
751 hub.notify("proc");
752 tokio::time::timeout(Duration::from_millis(100), rx.changed())
753 .await
754 .expect("notify should wake")
755 .expect("sender remains open");
756
757 drop(rx);
758 hub.notify("proc");
759 assert_eq!(hub.tracked_processes(), 0);
760 }
761
762 #[tokio::test]
763 async fn await_event_returns_historical_event_immediately() {
764 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
765 let (registry, hub) = watch_process_registry(raw);
766 registry
767 .register_process(registration("proc"))
768 .await
769 .expect("register");
770 let appended = registry
771 .append_event(
772 "proc",
773 ProcessEventAppendRequest::cancel_requested("proc", Some("stop".to_string())),
774 )
775 .await
776 .expect("append");
777
778 let event = ProcessAwaiter::new(Arc::clone(®istry), hub)
779 .await_event("proc", "process.cancel_requested", 0)
780 .await
781 .expect("await event");
782 assert_eq!(event.sequence, appended.event.sequence);
783 }
784
785 #[tokio::test]
786 async fn await_terminal_unknown_process_errors() {
787 let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
788 let err = ProcessAwaiter::polling(registry)
789 .await_terminal("missing")
790 .await
791 .expect_err("unknown process should error");
792 assert!(err.to_string().contains("unknown process `missing`"));
793 }
794
795 #[tokio::test]
796 async fn polling_awaiter_resolves_via_backoff() {
797 let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
798 registry
799 .register_process(registration("proc"))
800 .await
801 .expect("register");
802 let writer = Arc::clone(®istry);
803 tokio::spawn(async move {
804 tokio::time::sleep(Duration::from_millis(10)).await;
805 writer
806 .complete_process("proc", success(serde_json::json!({ "ok": true })))
807 .await
808 .expect("complete");
809 });
810
811 let output = tokio::time::timeout(
812 Duration::from_secs(1),
813 ProcessAwaiter::polling(registry).await_terminal("proc"),
814 )
815 .await
816 .expect("polling await timeout")
817 .expect("await terminal");
818 assert_eq!(output, success(serde_json::json!({ "ok": true })));
819 }
820
821 #[tokio::test]
822 async fn watched_awaiter_observes_terminal_without_lost_wakeup() {
823 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
824 let (registry, hub) = watch_process_registry(raw);
825 registry
826 .register_process(registration("proc"))
827 .await
828 .expect("register");
829 let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub);
830 let waiter = tokio::spawn(async move { awaiter.await_terminal("proc").await });
831 registry
832 .complete_process("proc", success(serde_json::json!("done")))
833 .await
834 .expect("complete");
835
836 let output = tokio::time::timeout(Duration::from_millis(200), waiter)
837 .await
838 .expect("watched await timeout")
839 .expect("join")
840 .expect("await terminal");
841 assert_eq!(output, success(serde_json::json!("done")));
842 }
843
844 #[tokio::test]
845 async fn watched_registry_bumps_on_mutations() {
846 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
847 let (registry, hub) = watch_process_registry(raw);
848 let mut rx = hub.subscribe("proc");
849 registry
850 .register_process(registration("proc"))
851 .await
852 .expect("register");
853 tokio::time::timeout(Duration::from_millis(100), rx.changed())
854 .await
855 .expect("register bump")
856 .expect("sender remains open");
857
858 registry
859 .append_event(
860 "proc",
861 ProcessEventAppendRequest::cancel_requested("proc", None),
862 )
863 .await
864 .expect("append");
865 tokio::time::timeout(Duration::from_millis(100), rx.changed())
866 .await
867 .expect("append bump")
868 .expect("sender remains open");
869 }
870
871 #[tokio::test]
872 async fn sink_receives_appended_events_in_order() {
873 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
874 let sink = CollectingSink::default();
875 let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
876 registry
877 .register_process(registration_with_events(
878 "proc",
879 &["producer.a", "producer.b"],
880 ))
881 .await
882 .expect("register");
883 registry
884 .append_event(
885 "proc",
886 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
887 )
888 .await
889 .expect("append a");
890 registry
891 .append_event(
892 "proc",
893 ProcessEventAppendRequest::new("producer.b", serde_json::json!({})),
894 )
895 .await
896 .expect("append b");
897
898 assert_eq!(
899 sink.collected(),
900 vec![("producer.a".to_string(), 1), ("producer.b".to_string(), 2)],
901 "the sink must observe appended events after their write, in append order"
902 );
903 }
904
905 #[tokio::test]
906 async fn sink_absent_leaves_appends_unchanged() {
907 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
908 let (registry, _hub) = watch_process_registry_with_sink(raw, None);
909 registry
910 .register_process(registration_with_events("proc", &["producer.a"]))
911 .await
912 .expect("register");
913 let appended = registry
914 .append_event(
915 "proc",
916 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
917 )
918 .await
919 .expect("append succeeds with no sink installed");
920 assert_eq!(appended.event.sequence, 1);
921 }
922
923 #[tokio::test]
924 async fn sink_not_invoked_for_complete_process_terminal_append() {
925 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
926 let sink = CollectingSink::default();
927 let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
928 registry
929 .register_process(registration_with_events("proc", &["producer.a"]))
930 .await
931 .expect("register");
932 registry
933 .append_event(
934 "proc",
935 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
936 )
937 .await
938 .expect("explicit append");
939 registry
940 .complete_process("proc", success(serde_json::json!("done")))
941 .await
942 .expect("complete");
943
944 assert_eq!(
945 sink.collected(),
946 vec![("producer.a".to_string(), 1)],
947 "complete_process appends its terminal event through the inner registry, so the \
948 decorator never emits it to the sink"
949 );
950 }
951
952 #[tokio::test]
953 async fn sink_present_still_bumps_hub_on_append() {
954 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
955 let sink = CollectingSink::default();
956 let (registry, hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink)));
957 let mut rx = hub.subscribe("proc");
958 registry
959 .register_process(registration_with_events("proc", &["producer.a"]))
960 .await
961 .expect("register");
962 tokio::time::timeout(Duration::from_millis(100), rx.changed())
963 .await
964 .expect("register bump")
965 .expect("sender remains open");
966 registry
967 .append_event(
968 "proc",
969 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
970 )
971 .await
972 .expect("append");
973 tokio::time::timeout(Duration::from_millis(100), rx.changed())
974 .await
975 .expect("append bump with a sink installed")
976 .expect("sender remains open");
977 }
978
979 struct NoopRunHandle;
980
981 #[async_trait::async_trait]
982 impl crate::ProcessRunHandle for NoopRunHandle {
983 async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
984 Ok(())
985 }
986 }
987
988 struct PanicAttach;
989
990 #[async_trait::async_trait]
991 impl ProcessAttach for PanicAttach {
992 async fn await_terminal(
993 &self,
994 _process_id: &str,
995 ) -> Result<ProcessAwaitOutput, PluginError> {
996 panic!("attach should not be called for already-terminal process")
997 }
998 }
999
1000 struct ErrorAttach;
1001
1002 #[async_trait::async_trait]
1003 impl ProcessAttach for ErrorAttach {
1004 async fn await_terminal(
1005 &self,
1006 _process_id: &str,
1007 ) -> Result<ProcessAwaitOutput, PluginError> {
1008 Err(PluginError::Session("attach failed".to_string()))
1009 }
1010 }
1011
1012 #[tokio::test]
1013 async fn driver_short_circuits_terminal_before_attach() {
1014 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1015 let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
1016 .with_attach(Arc::new(PanicAttach));
1017 let registry = driver.process_registry();
1018 registry
1019 .register_process(registration("proc"))
1020 .await
1021 .expect("register");
1022 registry
1023 .complete_process("proc", success(serde_json::json!("ready")))
1024 .await
1025 .expect("complete");
1026
1027 let output = driver.await_terminal("proc").await.expect("await terminal");
1028 assert_eq!(output, success(serde_json::json!("ready")));
1029 }
1030
1031 #[tokio::test]
1032 async fn driver_attach_errors_propagate_without_poll_fallback() {
1033 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1034 let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
1035 .with_attach(Arc::new(ErrorAttach));
1036 driver
1037 .process_registry()
1038 .register_process(registration("proc"))
1039 .await
1040 .expect("register");
1041
1042 let err = driver
1043 .await_terminal("proc")
1044 .await
1045 .expect_err("attach error should propagate");
1046 assert!(err.to_string().contains("attach failed"));
1047 }
1048
1049 struct CountingAttach {
1050 calls: Arc<std::sync::atomic::AtomicUsize>,
1051 }
1052
1053 #[async_trait::async_trait]
1054 impl ProcessAttach for CountingAttach {
1055 async fn await_terminal(
1056 &self,
1057 _process_id: &str,
1058 ) -> Result<ProcessAwaitOutput, PluginError> {
1059 self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1060 Err(PluginError::Session(
1061 "attach must not be consulted for a terminal process".to_string(),
1062 ))
1063 }
1064 }
1065
1066 #[tokio::test]
1071 async fn concurrent_waiters_all_resolve_with_identical_output_on_completion() {
1072 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1073 let (registry, hub) = watch_process_registry(raw);
1074 registry
1075 .register_process(registration("proc"))
1076 .await
1077 .expect("register");
1078
1079 const WAITERS: usize = 16;
1080 let barrier = Arc::new(tokio::sync::Barrier::new(WAITERS + 1));
1081 let mut waiters = Vec::with_capacity(WAITERS);
1082 for _ in 0..WAITERS {
1083 let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub.clone());
1084 let barrier = Arc::clone(&barrier);
1085 waiters.push(tokio::spawn(async move {
1086 barrier.wait().await;
1087 awaiter.await_terminal("proc").await
1088 }));
1089 }
1090 barrier.wait().await;
1093 let output = success(serde_json::json!({ "raced": true }));
1094 registry
1095 .complete_process("proc", output.clone())
1096 .await
1097 .expect("complete");
1098
1099 for waiter in waiters {
1100 let resolved = tokio::time::timeout(Duration::from_secs(2), waiter)
1101 .await
1102 .expect("each racing waiter resolves under 2s")
1103 .expect("join waiter")
1104 .expect("await terminal");
1105 assert_eq!(
1106 resolved, output,
1107 "every concurrent waiter resolves with identical terminal output"
1108 );
1109 }
1110 }
1111
1112 #[tokio::test]
1117 async fn driver_reattach_after_terminal_short_circuits_without_engine_call() {
1118 use std::sync::atomic::Ordering;
1119
1120 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1121 let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1122 let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle)).with_attach(
1123 Arc::new(CountingAttach {
1124 calls: Arc::clone(&calls),
1125 }),
1126 );
1127 let registry = driver.process_registry();
1128 registry
1129 .register_process(registration("proc"))
1130 .await
1131 .expect("register");
1132 let output = success(serde_json::json!("reattached"));
1134 registry
1135 .complete_process("proc", output.clone())
1136 .await
1137 .expect("complete");
1138
1139 let start = std::time::Instant::now();
1141 let resolved = driver.await_terminal("proc").await.expect("await terminal");
1142 assert_eq!(resolved, output);
1143 assert_eq!(
1144 calls.load(Ordering::SeqCst),
1145 0,
1146 "a terminal short-circuit must never call the engine attach"
1147 );
1148 assert!(
1149 start.elapsed() < Duration::from_millis(500),
1150 "a short-circuit resolves without any backoff wait"
1151 );
1152 }
1153
1154 #[derive(Clone, Default)]
1157 struct LossySink {
1158 seen: Arc<Mutex<Vec<u64>>>,
1159 dropped: Arc<Mutex<Vec<u64>>>,
1160 }
1161
1162 #[async_trait::async_trait]
1163 impl ProcessEventSink for LossySink {
1164 async fn emit(&self, event: &ProcessEvent) {
1165 if event.sequence.is_multiple_of(2) {
1166 self.dropped.lock().expect("sink lock").push(event.sequence);
1167 } else {
1168 self.seen.lock().expect("sink lock").push(event.sequence);
1169 }
1170 }
1171 }
1172
1173 #[tokio::test]
1178 async fn lossy_sink_still_reconciles_complete_log_from_events_after() {
1179 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
1180 let sink = LossySink::default();
1181 let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
1182 registry
1183 .register_process(registration_with_events("proc", &["producer.step"]))
1184 .await
1185 .expect("register");
1186
1187 const EVENTS: u64 = 6;
1188 for _ in 0..EVENTS {
1189 registry
1190 .append_event(
1191 "proc",
1192 ProcessEventAppendRequest::new("producer.step", serde_json::json!({})),
1193 )
1194 .await
1195 .expect("append");
1196 }
1197 registry
1200 .complete_process("proc", success(serde_json::json!("done")))
1201 .await
1202 .expect("complete");
1203
1204 assert!(
1206 !sink.dropped.lock().expect("sink lock").is_empty(),
1207 "the lossy sink must drop at least one emit for the scenario to be meaningful"
1208 );
1209 assert!(
1210 (sink.seen.lock().expect("sink lock").len() as u64) < EVENTS,
1211 "the sink observed fewer events than were appended"
1212 );
1213 let reconciled = registry
1215 .events_after("proc", 0)
1216 .await
1217 .expect("events")
1218 .into_iter()
1219 .filter(|event| event.event_type == "producer.step")
1220 .map(|event| event.sequence)
1221 .collect::<Vec<_>>();
1222 assert_eq!(
1223 reconciled,
1224 (1..=EVENTS).collect::<Vec<_>>(),
1225 "events_after reconciles the complete non-terminal log despite push loss"
1226 );
1227 }
1228}