1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tokio::sync::watch;
6
7use super::events::{
8 ProcessAwaitOutput, ProcessEvent, ProcessEventAppendRequest, ProcessEventAppendResult,
9};
10use super::model::{
11 ProcessExternalRef, ProcessHandleDescriptor, ProcessHandleGrant, ProcessHandleGrantEntry,
12 ProcessLease, ProcessLeaseClaimOutcome, ProcessLeaseCompletion, ProcessListFilter,
13 ProcessRecord, ProcessRegistration, ProcessSessionDeleteReport, SessionScope, WaitState,
14};
15use super::registry::{ProcessPruneReport, ProcessRegistry};
16use crate::PluginError;
17
18const AWAIT_BACKOFF_MIN: Duration = Duration::from_millis(25);
19const AWAIT_BACKOFF_MAX: Duration = Duration::from_secs(1);
20
21#[derive(Clone, Default)]
22pub struct ProcessChangeHub {
23 inner: Arc<Mutex<HashMap<String, watch::Sender<u64>>>>,
24}
25
26impl ProcessChangeHub {
27 pub fn new() -> Self {
28 Self::default()
29 }
30
31 pub fn subscribe(&self, process_id: &str) -> watch::Receiver<u64> {
34 let mut guard = self.inner.lock().expect("process change hub lock poisoned");
35 guard
36 .entry(process_id.to_string())
37 .or_insert_with(|| {
38 let (tx, _rx) = watch::channel(0);
39 tx
40 })
41 .subscribe()
42 }
43
44 pub fn notify(&self, process_id: &str) {
45 let mut guard = self.inner.lock().expect("process change hub lock poisoned");
46 let mut remove = false;
47 if let Some(tx) = guard.get(process_id) {
48 if tx.receiver_count() == 0 {
49 remove = true;
50 } else {
51 let next = (*tx.borrow()).wrapping_add(1);
52 if tx.send(next).is_err() {
53 remove = true;
54 }
55 }
56 }
57 if remove {
58 guard.remove(process_id);
59 }
60 }
61
62 #[cfg(test)]
63 fn tracked_processes(&self) -> usize {
64 self.inner
65 .lock()
66 .expect("process change hub lock poisoned")
67 .len()
68 }
69}
70
71#[async_trait::async_trait]
102pub trait ProcessEventSink: Send + Sync {
103 async fn emit(&self, event: &ProcessEvent);
107}
108
109struct WatchedProcessRegistry {
117 inner: Arc<dyn ProcessRegistry>,
118 hub: ProcessChangeHub,
119 sink: Option<Arc<dyn ProcessEventSink>>,
120}
121
122pub fn watch_process_registry(
128 inner: Arc<dyn ProcessRegistry>,
129) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
130 watch_process_registry_with_sink(inner, None)
131}
132
133pub fn watch_process_registry_with_sink(
138 inner: Arc<dyn ProcessRegistry>,
139 sink: Option<Arc<dyn ProcessEventSink>>,
140) -> (Arc<dyn ProcessRegistry>, ProcessChangeHub) {
141 let hub = ProcessChangeHub::new();
142 (
143 Arc::new(WatchedProcessRegistry {
144 inner,
145 hub: hub.clone(),
146 sink,
147 }),
148 hub,
149 )
150}
151
152#[derive(Clone)]
153pub struct ProcessAwaiter {
154 registry: Arc<dyn ProcessRegistry>,
155 hub: Option<ProcessChangeHub>,
156}
157
158impl ProcessAwaiter {
159 pub fn new(registry: Arc<dyn ProcessRegistry>, hub: ProcessChangeHub) -> Self {
160 Self {
161 registry,
162 hub: Some(hub),
163 }
164 }
165
166 pub fn polling(registry: Arc<dyn ProcessRegistry>) -> Self {
167 Self {
168 registry,
169 hub: None,
170 }
171 }
172
173 pub async fn await_terminal(
174 &self,
175 process_id: &str,
176 ) -> Result<ProcessAwaitOutput, PluginError> {
177 let mut backoff = AWAIT_BACKOFF_MIN;
178 if let Some(hub) = self.hub.as_ref() {
179 let mut rx = hub.subscribe(process_id);
180 loop {
181 if let Some(output) = self.read_terminal(process_id).await? {
182 return Ok(output);
183 }
184 tokio::select! {
185 changed = rx.changed() => {
186 match changed {
187 Ok(()) => backoff = AWAIT_BACKOFF_MIN,
188 Err(_) => break,
194 }
195 }
196 _ = tokio::time::sleep(backoff) => {
197 backoff = next_backoff(backoff);
198 }
199 }
200 }
201 }
202 loop {
203 if let Some(output) = self.read_terminal(process_id).await? {
204 return Ok(output);
205 }
206 tokio::time::sleep(backoff).await;
207 backoff = next_backoff(backoff);
208 }
209 }
210
211 pub async fn await_event(
212 &self,
213 process_id: &str,
214 event_type: &str,
215 after_sequence: u64,
216 ) -> Result<ProcessEvent, PluginError> {
217 let mut backoff = AWAIT_BACKOFF_MIN;
218 if let Some(hub) = self.hub.as_ref() {
219 let mut rx = hub.subscribe(process_id);
220 loop {
221 if let Some(event) = self
222 .read_event(process_id, event_type, after_sequence)
223 .await?
224 {
225 return Ok(event);
226 }
227 tokio::select! {
228 changed = rx.changed() => {
229 match changed {
230 Ok(()) => backoff = AWAIT_BACKOFF_MIN,
231 Err(_) => break,
237 }
238 }
239 _ = tokio::time::sleep(backoff) => {
240 backoff = next_backoff(backoff);
241 }
242 }
243 }
244 }
245 loop {
246 if let Some(event) = self
247 .read_event(process_id, event_type, after_sequence)
248 .await?
249 {
250 return Ok(event);
251 }
252 tokio::time::sleep(backoff).await;
253 backoff = next_backoff(backoff);
254 }
255 }
256
257 async fn read_terminal(
258 &self,
259 process_id: &str,
260 ) -> Result<Option<ProcessAwaitOutput>, PluginError> {
261 let record = self
262 .registry
263 .get_process(process_id)
264 .await
265 .ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
266 Ok(record.status.await_output().cloned())
267 }
268
269 async fn read_event(
270 &self,
271 process_id: &str,
272 event_type: &str,
273 after_sequence: u64,
274 ) -> Result<Option<ProcessEvent>, PluginError> {
275 Ok(self
276 .registry
277 .events_after(process_id, after_sequence)
278 .await?
279 .into_iter()
280 .find(|event| event.event_type == event_type))
281 }
282}
283
284fn next_backoff(current: Duration) -> Duration {
285 current.saturating_mul(2).min(AWAIT_BACKOFF_MAX)
286}
287
288#[async_trait::async_trait]
289pub trait ProcessAttach: Send + Sync {
290 async fn await_terminal(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError>;
291}
292
293#[async_trait::async_trait]
294impl ProcessRegistry for WatchedProcessRegistry {
295 fn durability_tier(&self) -> crate::DurabilityTier {
296 self.inner.durability_tier()
297 }
298
299 async fn register_process(
300 &self,
301 registration: ProcessRegistration,
302 ) -> Result<ProcessRecord, PluginError> {
303 let process_id = registration.id.clone();
304 let record = self.inner.register_process(registration).await?;
305 self.hub.notify(&process_id);
306 Ok(record)
307 }
308
309 async fn set_external_ref(
310 &self,
311 process_id: &str,
312 external_ref: ProcessExternalRef,
313 ) -> Result<ProcessRecord, PluginError> {
314 let record = self
315 .inner
316 .set_external_ref(process_id, external_ref)
317 .await?;
318 self.hub.notify(process_id);
319 Ok(record)
320 }
321
322 async fn grant_handle(
323 &self,
324 session_scope: &SessionScope,
325 process_id: &str,
326 descriptor: ProcessHandleDescriptor,
327 ) -> Result<ProcessHandleGrant, PluginError> {
328 self.inner
329 .grant_handle(session_scope, process_id, descriptor)
330 .await
331 }
332
333 async fn revoke_handle(
334 &self,
335 session_scope: &SessionScope,
336 process_id: &str,
337 ) -> Result<(), PluginError> {
338 self.inner.revoke_handle(session_scope, process_id).await
339 }
340
341 async fn transfer_handle_grants(
342 &self,
343 from_scope: &SessionScope,
344 to_scope: &SessionScope,
345 process_ids: &[String],
346 ) -> Result<(), PluginError> {
347 self.inner
348 .transfer_handle_grants(from_scope, to_scope, process_ids)
349 .await
350 }
351
352 async fn list_handle_grants(
353 &self,
354 session_scope: &SessionScope,
355 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
356 self.inner.list_handle_grants(session_scope).await
357 }
358
359 async fn list_live_handle_grants(
360 &self,
361 session_scope: &SessionScope,
362 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
363 self.inner.list_live_handle_grants(session_scope).await
364 }
365
366 async fn has_handle_grant(
367 &self,
368 session_scope: &SessionScope,
369 process_id: &str,
370 ) -> Result<bool, PluginError> {
371 self.inner.has_handle_grant(session_scope, process_id).await
372 }
373
374 async fn handle_grants_for_process(
375 &self,
376 process_id: &str,
377 ) -> Result<Vec<ProcessHandleGrant>, PluginError> {
378 self.inner.handle_grants_for_process(process_id).await
379 }
380
381 async fn delete_session_process_state(
382 &self,
383 session_id: &str,
384 ) -> Result<ProcessSessionDeleteReport, PluginError> {
385 self.inner.delete_session_process_state(session_id).await
386 }
387
388 async fn append_event(
389 &self,
390 process_id: &str,
391 request: ProcessEventAppendRequest,
392 ) -> Result<ProcessEventAppendResult, PluginError> {
393 let result = self.inner.append_event(process_id, request).await?;
394 self.hub.notify(process_id);
395 if let Some(sink) = self.sink.as_ref() {
399 sink.emit(&result.event).await;
400 }
401 Ok(result)
402 }
403
404 async fn events_after(
405 &self,
406 process_id: &str,
407 after_sequence: u64,
408 ) -> Result<Vec<ProcessEvent>, PluginError> {
409 self.inner.events_after(process_id, after_sequence).await
410 }
411
412 async fn count_events_through(
413 &self,
414 process_id: &str,
415 event_type: &str,
416 up_to_sequence: u64,
417 ) -> Result<u64, PluginError> {
418 self.inner
419 .count_events_through(process_id, event_type, up_to_sequence)
420 .await
421 }
422
423 async fn recent_events(
424 &self,
425 process_id: &str,
426 limit: usize,
427 ) -> Result<Vec<ProcessEvent>, PluginError> {
428 self.inner.recent_events(process_id, limit).await
429 }
430
431 async fn wake_events_after(
432 &self,
433 process_id: &str,
434 after_sequence: u64,
435 ) -> Result<Vec<ProcessEvent>, PluginError> {
436 self.inner
437 .wake_events_after(process_id, after_sequence)
438 .await
439 }
440
441 async fn complete_process(
442 &self,
443 process_id: &str,
444 await_output: ProcessAwaitOutput,
445 ) -> Result<ProcessRecord, PluginError> {
446 let record = self
447 .inner
448 .complete_process(process_id, await_output)
449 .await?;
450 self.hub.notify(process_id);
451 Ok(record)
452 }
453
454 async fn set_process_wait(
455 &self,
456 process_id: &str,
457 wait: WaitState,
458 ) -> Result<ProcessRecord, PluginError> {
459 let record = self.inner.set_process_wait(process_id, wait).await?;
460 self.hub.notify(process_id);
461 Ok(record)
462 }
463
464 async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
465 let record = self.inner.clear_process_wait(process_id).await?;
466 self.hub.notify(process_id);
467 Ok(record)
468 }
469
470 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
471 self.inner.get_process(process_id).await
472 }
473
474 async fn list_processes(
475 &self,
476 filter: &ProcessListFilter,
477 ) -> Result<Vec<ProcessRecord>, PluginError> {
478 self.inner.list_processes(filter).await
479 }
480
481 async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
482 self.inner.ack_wake(process_id, sequence).await?;
483 self.hub.notify(process_id);
484 Ok(())
485 }
486
487 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
488 self.inner.list_non_terminal().await
489 }
490
491 async fn claim_process_lease(
492 &self,
493 process_id: &str,
494 owner: &crate::LeaseOwnerIdentity,
495 lease_ttl_ms: u64,
496 ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
497 self.inner
498 .claim_process_lease(process_id, owner, lease_ttl_ms)
499 .await
500 }
501
502 async fn reclaim_process_lease(
503 &self,
504 process_id: &str,
505 owner: &crate::LeaseOwnerIdentity,
506 observed_holder: &ProcessLease,
507 lease_ttl_ms: u64,
508 ) -> Result<ProcessLeaseClaimOutcome, PluginError> {
509 self.inner
510 .reclaim_process_lease(process_id, owner, observed_holder, lease_ttl_ms)
511 .await
512 }
513
514 async fn renew_process_lease(
515 &self,
516 lease: &ProcessLease,
517 lease_ttl_ms: u64,
518 ) -> Result<ProcessLease, PluginError> {
519 self.inner.renew_process_lease(lease, lease_ttl_ms).await
520 }
521
522 async fn complete_process_lease(
523 &self,
524 completion: &ProcessLeaseCompletion,
525 ) -> Result<(), PluginError> {
526 self.inner.complete_process_lease(completion).await
527 }
528
529 async fn prune_terminal_processes(
530 &self,
531 cutoff_epoch_ms: u64,
532 ) -> Result<ProcessPruneReport, PluginError> {
533 self.inner.prune_terminal_processes(cutoff_epoch_ms).await
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use std::sync::Arc;
542
543 use super::*;
544 use crate::{
545 ProcessInput, ProcessProvenance, ProcessRegistration, TestLocalProcessRegistry, ToolControl,
546 };
547
548 fn registration(process_id: &str) -> ProcessRegistration {
549 ProcessRegistration::new(
550 process_id,
551 ProcessInput::External {
552 metadata: serde_json::json!({}),
553 },
554 ProcessProvenance::host(),
555 )
556 }
557
558 fn plain_event_type(name: &str) -> crate::ProcessEventType {
559 crate::ProcessEventType {
560 name: name.to_string(),
561 payload_schema: crate::LashSchema::any(),
562 semantics: crate::ProcessEventSemanticsSpec::default(),
563 }
564 }
565
566 fn registration_with_events(process_id: &str, event_types: &[&str]) -> ProcessRegistration {
567 registration(process_id)
568 .with_extra_event_types(event_types.iter().map(|name| plain_event_type(name)))
569 }
570
571 #[derive(Clone, Default)]
573 struct CollectingSink {
574 events: Arc<Mutex<Vec<(String, u64)>>>,
575 }
576
577 impl CollectingSink {
578 fn collected(&self) -> Vec<(String, u64)> {
579 self.events.lock().expect("sink lock").clone()
580 }
581 }
582
583 #[async_trait::async_trait]
584 impl ProcessEventSink for CollectingSink {
585 async fn emit(&self, event: &ProcessEvent) {
586 self.events
587 .lock()
588 .expect("sink lock")
589 .push((event.event_type.clone(), event.sequence));
590 }
591 }
592
593 fn success(value: serde_json::Value) -> ProcessAwaitOutput {
594 ProcessAwaitOutput::Success {
595 value,
596 control: None::<ToolControl>,
597 }
598 }
599
600 #[tokio::test]
601 async fn hub_subscribe_then_notify_wakes_and_gc_drops_empty_entry() {
602 let hub = ProcessChangeHub::new();
603 let mut rx = hub.subscribe("proc");
604 hub.notify("proc");
605 tokio::time::timeout(Duration::from_millis(100), rx.changed())
606 .await
607 .expect("notify should wake")
608 .expect("sender remains open");
609
610 drop(rx);
611 hub.notify("proc");
612 assert_eq!(hub.tracked_processes(), 0);
613 }
614
615 #[tokio::test]
616 async fn await_event_returns_historical_event_immediately() {
617 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
618 let (registry, hub) = watch_process_registry(raw);
619 registry
620 .register_process(registration("proc"))
621 .await
622 .expect("register");
623 let appended = registry
624 .append_event(
625 "proc",
626 ProcessEventAppendRequest::cancel_requested("proc", Some("stop".to_string())),
627 )
628 .await
629 .expect("append");
630
631 let event = ProcessAwaiter::new(Arc::clone(®istry), hub)
632 .await_event("proc", "process.cancel_requested", 0)
633 .await
634 .expect("await event");
635 assert_eq!(event.sequence, appended.event.sequence);
636 }
637
638 #[tokio::test]
639 async fn await_terminal_unknown_process_errors() {
640 let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
641 let err = ProcessAwaiter::polling(registry)
642 .await_terminal("missing")
643 .await
644 .expect_err("unknown process should error");
645 assert!(err.to_string().contains("unknown process `missing`"));
646 }
647
648 #[tokio::test]
649 async fn polling_awaiter_resolves_via_backoff() {
650 let registry = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
651 registry
652 .register_process(registration("proc"))
653 .await
654 .expect("register");
655 let writer = Arc::clone(®istry);
656 tokio::spawn(async move {
657 tokio::time::sleep(Duration::from_millis(10)).await;
658 writer
659 .complete_process("proc", success(serde_json::json!({ "ok": true })))
660 .await
661 .expect("complete");
662 });
663
664 let output = tokio::time::timeout(
665 Duration::from_secs(1),
666 ProcessAwaiter::polling(registry).await_terminal("proc"),
667 )
668 .await
669 .expect("polling await timeout")
670 .expect("await terminal");
671 assert_eq!(output, success(serde_json::json!({ "ok": true })));
672 }
673
674 #[tokio::test]
675 async fn watched_awaiter_observes_terminal_without_lost_wakeup() {
676 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
677 let (registry, hub) = watch_process_registry(raw);
678 registry
679 .register_process(registration("proc"))
680 .await
681 .expect("register");
682 let awaiter = ProcessAwaiter::new(Arc::clone(®istry), hub);
683 let waiter = tokio::spawn(async move { awaiter.await_terminal("proc").await });
684 registry
685 .complete_process("proc", success(serde_json::json!("done")))
686 .await
687 .expect("complete");
688
689 let output = tokio::time::timeout(Duration::from_millis(200), waiter)
690 .await
691 .expect("watched await timeout")
692 .expect("join")
693 .expect("await terminal");
694 assert_eq!(output, success(serde_json::json!("done")));
695 }
696
697 #[tokio::test]
698 async fn watched_registry_bumps_on_mutations() {
699 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
700 let (registry, hub) = watch_process_registry(raw);
701 let mut rx = hub.subscribe("proc");
702 registry
703 .register_process(registration("proc"))
704 .await
705 .expect("register");
706 tokio::time::timeout(Duration::from_millis(100), rx.changed())
707 .await
708 .expect("register bump")
709 .expect("sender remains open");
710
711 registry
712 .append_event(
713 "proc",
714 ProcessEventAppendRequest::cancel_requested("proc", None),
715 )
716 .await
717 .expect("append");
718 tokio::time::timeout(Duration::from_millis(100), rx.changed())
719 .await
720 .expect("append bump")
721 .expect("sender remains open");
722 }
723
724 #[tokio::test]
725 async fn sink_receives_appended_events_in_order() {
726 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
727 let sink = CollectingSink::default();
728 let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
729 registry
730 .register_process(registration_with_events(
731 "proc",
732 &["producer.a", "producer.b"],
733 ))
734 .await
735 .expect("register");
736 registry
737 .append_event(
738 "proc",
739 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
740 )
741 .await
742 .expect("append a");
743 registry
744 .append_event(
745 "proc",
746 ProcessEventAppendRequest::new("producer.b", serde_json::json!({})),
747 )
748 .await
749 .expect("append b");
750
751 assert_eq!(
752 sink.collected(),
753 vec![("producer.a".to_string(), 1), ("producer.b".to_string(), 2)],
754 "the sink must observe appended events after their write, in append order"
755 );
756 }
757
758 #[tokio::test]
759 async fn sink_absent_leaves_appends_unchanged() {
760 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
761 let (registry, _hub) = watch_process_registry_with_sink(raw, None);
762 registry
763 .register_process(registration_with_events("proc", &["producer.a"]))
764 .await
765 .expect("register");
766 let appended = registry
767 .append_event(
768 "proc",
769 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
770 )
771 .await
772 .expect("append succeeds with no sink installed");
773 assert_eq!(appended.event.sequence, 1);
774 }
775
776 #[tokio::test]
777 async fn sink_not_invoked_for_complete_process_terminal_append() {
778 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
779 let sink = CollectingSink::default();
780 let (registry, _hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink.clone())));
781 registry
782 .register_process(registration_with_events("proc", &["producer.a"]))
783 .await
784 .expect("register");
785 registry
786 .append_event(
787 "proc",
788 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
789 )
790 .await
791 .expect("explicit append");
792 registry
793 .complete_process("proc", success(serde_json::json!("done")))
794 .await
795 .expect("complete");
796
797 assert_eq!(
798 sink.collected(),
799 vec![("producer.a".to_string(), 1)],
800 "complete_process appends its terminal event through the inner registry, so the \
801 decorator never emits it to the sink"
802 );
803 }
804
805 #[tokio::test]
806 async fn sink_present_still_bumps_hub_on_append() {
807 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
808 let sink = CollectingSink::default();
809 let (registry, hub) = watch_process_registry_with_sink(raw, Some(Arc::new(sink)));
810 let mut rx = hub.subscribe("proc");
811 registry
812 .register_process(registration_with_events("proc", &["producer.a"]))
813 .await
814 .expect("register");
815 tokio::time::timeout(Duration::from_millis(100), rx.changed())
816 .await
817 .expect("register bump")
818 .expect("sender remains open");
819 registry
820 .append_event(
821 "proc",
822 ProcessEventAppendRequest::new("producer.a", serde_json::json!({})),
823 )
824 .await
825 .expect("append");
826 tokio::time::timeout(Duration::from_millis(100), rx.changed())
827 .await
828 .expect("append bump with a sink installed")
829 .expect("sender remains open");
830 }
831
832 struct NoopRunHandle;
833
834 #[async_trait::async_trait]
835 impl crate::ProcessRunHandle for NoopRunHandle {
836 async fn claim_and_run_pending(&self) -> Result<(), PluginError> {
837 Ok(())
838 }
839 }
840
841 struct PanicAttach;
842
843 #[async_trait::async_trait]
844 impl ProcessAttach for PanicAttach {
845 async fn await_terminal(
846 &self,
847 _process_id: &str,
848 ) -> Result<ProcessAwaitOutput, PluginError> {
849 panic!("attach should not be called for already-terminal process")
850 }
851 }
852
853 struct ErrorAttach;
854
855 #[async_trait::async_trait]
856 impl ProcessAttach for ErrorAttach {
857 async fn await_terminal(
858 &self,
859 _process_id: &str,
860 ) -> Result<ProcessAwaitOutput, PluginError> {
861 Err(PluginError::Session("attach failed".to_string()))
862 }
863 }
864
865 #[tokio::test]
866 async fn driver_short_circuits_terminal_before_attach() {
867 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
868 let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
869 .with_attach(Arc::new(PanicAttach));
870 let registry = driver.process_registry();
871 registry
872 .register_process(registration("proc"))
873 .await
874 .expect("register");
875 registry
876 .complete_process("proc", success(serde_json::json!("ready")))
877 .await
878 .expect("complete");
879
880 let output = driver.await_terminal("proc").await.expect("await terminal");
881 assert_eq!(output, success(serde_json::json!("ready")));
882 }
883
884 #[tokio::test]
885 async fn driver_attach_errors_propagate_without_poll_fallback() {
886 let raw = Arc::new(TestLocalProcessRegistry::default()) as Arc<dyn ProcessRegistry>;
887 let driver = crate::ProcessWorkDriver::new(raw, Arc::new(NoopRunHandle))
888 .with_attach(Arc::new(ErrorAttach));
889 driver
890 .process_registry()
891 .register_process(registration("proc"))
892 .await
893 .expect("register");
894
895 let err = driver
896 .await_terminal("proc")
897 .await
898 .expect_err("attach error should propagate");
899 assert!(err.to_string().contains("attach failed"));
900 }
901}