1#![allow(missing_docs)]
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6use std::time::Duration;
7
8use uuid::Uuid;
9
10use crate::agent::OnInput;
11use crate::agent::events::{AgentEvent, OnEvent};
12use crate::error::Error;
13use crate::llm::{ApprovalDecision, OnApproval, OnText};
14use crate::tool::builtins::{OnQuestion, QuestionRequest, QuestionResponse};
15
16#[derive(Debug, Clone)]
18pub enum OutboundMessage {
19 TextDelta {
20 session_id: Uuid,
21 text: String,
22 },
23 AgentEvent {
24 session_id: Uuid,
25 event: AgentEvent,
26 },
27 InputNeeded {
28 session_id: Uuid,
29 interaction_id: Uuid,
30 },
31 ApprovalNeeded {
32 session_id: Uuid,
33 interaction_id: Uuid,
34 tool_calls: serde_json::Value,
35 },
36 QuestionNeeded {
37 session_id: Uuid,
38 interaction_id: Uuid,
39 request: QuestionRequest,
40 },
41 ChatFinal {
43 session_id: Uuid,
44 result: String,
45 },
46 ChatError {
48 session_id: Uuid,
49 error: String,
50 },
51 RawFrame(crate::channel::types::WsFrame),
53}
54
55struct PendingEntry {
64 session_id: Uuid,
65 sender: PendingSender,
66}
67
68enum PendingSender {
69 Input(tokio::sync::oneshot::Sender<Option<String>>),
70 Approval(std::sync::mpsc::Sender<ApprovalDecision>),
71 Question(tokio::sync::oneshot::Sender<Result<QuestionResponse, Error>>),
72}
73
74const GRACE_PERIOD: Duration = Duration::from_secs(15);
78
79pub struct InteractionBridge {
84 pending: RwLock<HashMap<Uuid, PendingEntry>>,
85 outbound: tokio::sync::mpsc::Sender<OutboundMessage>,
86 timeout: Duration,
87}
88
89impl InteractionBridge {
90 pub fn new(outbound: tokio::sync::mpsc::Sender<OutboundMessage>, timeout: Duration) -> Self {
92 Self {
93 pending: RwLock::new(HashMap::new()),
94 outbound,
95 timeout,
96 }
97 }
98
99 pub fn make_on_text(self: &Arc<Self>, session_id: Uuid) -> Arc<OnText> {
101 let outbound = self.outbound.clone();
102 Arc::new(move |text: &str| {
103 let _ = outbound.try_send(OutboundMessage::TextDelta {
104 session_id,
105 text: text.to_string(),
106 });
107 })
108 }
109
110 pub fn make_on_event(self: &Arc<Self>, session_id: Uuid) -> Arc<OnEvent> {
112 let outbound = self.outbound.clone();
113 Arc::new(move |event: AgentEvent| {
114 let _ = outbound.try_send(OutboundMessage::AgentEvent { session_id, event });
115 })
116 }
117
118 pub fn make_on_input(self: &Arc<Self>, session_id: Uuid) -> Arc<OnInput> {
123 let bridge = Arc::clone(self);
124 Arc::new(move || {
125 let bridge = Arc::clone(&bridge);
126 Box::pin(async move {
127 let interaction_id = Uuid::new_v4();
128 let (tx, rx) = tokio::sync::oneshot::channel();
129
130 {
132 let mut pending = bridge.pending.write().expect("pending lock not poisoned");
133 pending.insert(
134 interaction_id,
135 PendingEntry {
136 session_id,
137 sender: PendingSender::Input(tx),
138 },
139 );
140 }
141
142 let _ = bridge.outbound.try_send(OutboundMessage::InputNeeded {
144 session_id,
145 interaction_id,
146 });
147
148 match tokio::time::timeout(bridge.timeout, rx).await {
150 Ok(Ok(msg)) => msg,
151 _ => {
152 let cleanup_bridge = Arc::clone(&bridge);
155 tokio::spawn(async move {
156 tokio::time::sleep(GRACE_PERIOD).await;
157 cleanup_bridge.cleanup_pending(interaction_id);
158 });
159 None
160 }
161 }
162 })
163 })
164 }
165
166 pub fn make_on_approval(self: &Arc<Self>, session_id: Uuid) -> Arc<OnApproval> {
171 let bridge = Arc::clone(self);
172 Arc::new(move |tool_calls: &[crate::llm::types::ToolCall]| {
173 let interaction_id = Uuid::new_v4();
174 let (tx, rx) = std::sync::mpsc::channel();
175
176 let tool_calls_json = serde_json::to_value(tool_calls).unwrap_or_default();
178
179 {
181 let mut pending = bridge.pending.write().expect("pending lock not poisoned");
182 pending.insert(
183 interaction_id,
184 PendingEntry {
185 session_id,
186 sender: PendingSender::Approval(tx),
187 },
188 );
189 }
190
191 let _ = bridge.outbound.try_send(OutboundMessage::ApprovalNeeded {
193 session_id,
194 interaction_id,
195 tool_calls: tool_calls_json,
196 });
197
198 match rx.recv_timeout(bridge.timeout) {
200 Ok(decision) => decision,
201 Err(_) => {
202 let cleanup_bridge = Arc::clone(&bridge);
206 if let Ok(handle) = tokio::runtime::Handle::try_current() {
207 handle.spawn(async move {
208 tokio::time::sleep(GRACE_PERIOD).await;
209 cleanup_bridge.cleanup_pending(interaction_id);
210 });
211 } else {
212 bridge.cleanup_pending(interaction_id);
213 }
214 ApprovalDecision::Deny
215 }
216 }
217 })
218 }
219
220 pub fn make_on_question(self: &Arc<Self>, session_id: Uuid) -> Arc<OnQuestion> {
224 let bridge = Arc::clone(self);
225 Arc::new(move |request: QuestionRequest| {
226 let bridge = Arc::clone(&bridge);
227 let request_clone = request.clone();
228 Box::pin(async move {
229 let interaction_id = Uuid::new_v4();
230 let (tx, rx) = tokio::sync::oneshot::channel();
231
232 {
234 let mut pending = bridge.pending.write().expect("pending lock not poisoned");
235 pending.insert(
236 interaction_id,
237 PendingEntry {
238 session_id,
239 sender: PendingSender::Question(tx),
240 },
241 );
242 }
243
244 let _ = bridge.outbound.try_send(OutboundMessage::QuestionNeeded {
246 session_id,
247 interaction_id,
248 request: request_clone,
249 });
250
251 match tokio::time::timeout(bridge.timeout, rx).await {
253 Ok(Ok(result)) => result,
254 Ok(Err(_)) => {
255 let cleanup_bridge = Arc::clone(&bridge);
257 tokio::spawn(async move {
258 tokio::time::sleep(GRACE_PERIOD).await;
259 cleanup_bridge.cleanup_pending(interaction_id);
260 });
261 Err(Error::Channel("interaction channel closed".into()))
262 }
263 Err(_) => {
264 let cleanup_bridge = Arc::clone(&bridge);
266 tokio::spawn(async move {
267 tokio::time::sleep(GRACE_PERIOD).await;
268 cleanup_bridge.cleanup_pending(interaction_id);
269 });
270 Err(Error::Channel("interaction timed out".into()))
271 }
272 }
273 })
274 })
275 }
276
277 pub fn resolve_input(&self, id: Uuid, message: Option<String>) -> Result<(), Error> {
286 self.resolve_input_for_session(None, id, message)
287 }
288
289 pub fn resolve_input_for_session(
292 &self,
293 expected_session: Option<Uuid>,
294 id: Uuid,
295 message: Option<String>,
296 ) -> Result<(), Error> {
297 let entry = self.take_pending(id, expected_session)?;
298 match entry.sender {
299 PendingSender::Input(tx) => {
300 let _ = tx.send(message);
301 Ok(())
302 }
303 other => {
304 drop(other);
305 Err(Error::Channel(format!(
306 "interaction {id} is not an input interaction"
307 )))
308 }
309 }
310 }
311
312 pub fn resolve_approval(&self, id: Uuid, decision: ApprovalDecision) -> Result<(), Error> {
314 self.resolve_approval_for_session(None, id, decision)
315 }
316
317 pub fn resolve_approval_for_session(
320 &self,
321 expected_session: Option<Uuid>,
322 id: Uuid,
323 decision: ApprovalDecision,
324 ) -> Result<(), Error> {
325 let entry = self.take_pending(id, expected_session)?;
326 match entry.sender {
327 PendingSender::Approval(tx) => {
328 let _ = tx.send(decision);
329 Ok(())
330 }
331 other => {
332 drop(other);
333 Err(Error::Channel(format!(
334 "interaction {id} is not an approval interaction"
335 )))
336 }
337 }
338 }
339
340 pub fn resolve_question(&self, id: Uuid, response: QuestionResponse) -> Result<(), Error> {
342 self.resolve_question_for_session(None, id, response)
343 }
344
345 pub fn resolve_question_for_session(
348 &self,
349 expected_session: Option<Uuid>,
350 id: Uuid,
351 response: QuestionResponse,
352 ) -> Result<(), Error> {
353 let entry = self.take_pending(id, expected_session)?;
354 match entry.sender {
355 PendingSender::Question(tx) => {
356 let _ = tx.send(Ok(response));
357 Ok(())
358 }
359 other => {
360 drop(other);
361 Err(Error::Channel(format!(
362 "interaction {id} is not a question interaction"
363 )))
364 }
365 }
366 }
367
368 fn take_pending(
374 &self,
375 id: Uuid,
376 expected_session: Option<Uuid>,
377 ) -> Result<PendingEntry, Error> {
378 let mut pending = self
379 .pending
380 .write()
381 .map_err(|e| Error::Channel(format!("lock poisoned: {e}")))?;
382 let entry = pending
383 .remove(&id)
384 .ok_or_else(|| Error::Channel(format!("no pending interaction with id {id}")))?;
385 if let Some(expected) = expected_session
386 && entry.session_id != expected
387 {
388 return Err(Error::Channel(format!(
389 "interaction {id} does not belong to session {expected} (F-AUTH-5)"
390 )));
391 }
392 Ok(entry)
393 }
394
395 fn cleanup_pending(&self, id: Uuid) {
397 if let Ok(mut pending) = self.pending.write() {
398 pending.remove(&id);
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 use crate::tool::builtins::{Question, QuestionOption};
408
409 fn make_bridge(
410 timeout: Duration,
411 ) -> (
412 Arc<InteractionBridge>,
413 tokio::sync::mpsc::Receiver<OutboundMessage>,
414 ) {
415 let (tx, rx) = tokio::sync::mpsc::channel(16);
416 let bridge = Arc::new(InteractionBridge::new(tx, timeout));
417 (bridge, rx)
418 }
419
420 fn make_question_request() -> QuestionRequest {
421 QuestionRequest {
422 questions: vec![Question {
423 question: "Pick a color".into(),
424 header: "Color".into(),
425 options: vec![
426 QuestionOption {
427 label: "Red".into(),
428 description: "A warm color".into(),
429 },
430 QuestionOption {
431 label: "Blue".into(),
432 description: "A cool color".into(),
433 },
434 ],
435 multiple: false,
436 }],
437 }
438 }
439
440 #[tokio::test]
443 async fn text_delta_forwarded() {
444 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
445 let session_id = Uuid::new_v4();
446 let on_text = bridge.make_on_text(session_id);
447
448 on_text("hello world");
449
450 let msg = rx.recv().await.expect("should receive outbound message");
451 match msg {
452 OutboundMessage::TextDelta {
453 session_id: sid,
454 text,
455 } => {
456 assert_eq!(sid, session_id);
457 assert_eq!(text, "hello world");
458 }
459 other => panic!("expected TextDelta, got: {other:?}"),
460 }
461 }
462
463 #[tokio::test]
466 async fn agent_event_forwarded() {
467 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
468 let session_id = Uuid::new_v4();
469 let on_event = bridge.make_on_event(session_id);
470
471 let event = AgentEvent::RunStarted {
472 agent: "test".into(),
473 task: "do stuff".into(),
474 };
475 on_event(event.clone());
476
477 let msg = rx.recv().await.expect("should receive outbound message");
478 match msg {
479 OutboundMessage::AgentEvent {
480 session_id: sid,
481 event: received,
482 } => {
483 assert_eq!(sid, session_id);
484 let expected_json = serde_json::to_string(&AgentEvent::RunStarted {
486 agent: "test".into(),
487 task: "do stuff".into(),
488 })
489 .expect("serialize");
490 let received_json = serde_json::to_string(&received).expect("serialize");
491 assert_eq!(expected_json, received_json);
492 }
493 other => panic!("expected AgentEvent, got: {other:?}"),
494 }
495 }
496
497 #[tokio::test]
500 async fn resolve_input_before_timeout() {
501 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
502 let session_id = Uuid::new_v4();
503 let on_input = bridge.make_on_input(session_id);
504
505 let handle = tokio::spawn(async move { on_input().await });
507
508 let msg = rx.recv().await.expect("should receive InputNeeded");
510 let interaction_id = match msg {
511 OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
512 other => panic!("expected InputNeeded, got: {other:?}"),
513 };
514
515 bridge
517 .resolve_input(interaction_id, Some("hello".into()))
518 .expect("resolve should succeed");
519
520 let result = handle.await.expect("task should complete");
522 assert_eq!(result, Some("hello".into()));
523 }
524
525 #[tokio::test]
528 async fn input_timeout_returns_none() {
529 let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
530 let session_id = Uuid::new_v4();
531 let on_input = bridge.make_on_input(session_id);
532
533 let handle = tokio::spawn(async move { on_input().await });
534
535 let _msg = rx.recv().await.expect("should receive InputNeeded");
537
538 let result = handle.await.expect("task should complete");
540 assert_eq!(result, None);
541 }
542
543 #[tokio::test]
546 async fn resolve_approval_before_timeout() {
547 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
548 let session_id = Uuid::new_v4();
549 let on_approval = bridge.make_on_approval(session_id);
550
551 let handle = tokio::task::spawn_blocking(move || on_approval(&[]));
553
554 let msg = rx.recv().await.expect("should receive ApprovalNeeded");
556 let interaction_id = match msg {
557 OutboundMessage::ApprovalNeeded { interaction_id, .. } => interaction_id,
558 other => panic!("expected ApprovalNeeded, got: {other:?}"),
559 };
560
561 bridge
563 .resolve_approval(interaction_id, ApprovalDecision::Allow)
564 .expect("resolve should succeed");
565
566 let result = handle.await.expect("task should complete");
568 assert_eq!(result, ApprovalDecision::Allow);
569 }
570
571 #[tokio::test]
574 async fn approval_timeout_returns_deny() {
575 let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
576 let session_id = Uuid::new_v4();
577 let on_approval = bridge.make_on_approval(session_id);
578
579 let handle = tokio::task::spawn_blocking(move || on_approval(&[]));
580
581 let _msg = rx.recv().await.expect("should receive ApprovalNeeded");
583
584 let result = handle.await.expect("task should complete");
586 assert_eq!(result, ApprovalDecision::Deny);
587 }
588
589 #[tokio::test]
592 async fn resolve_question_before_timeout() {
593 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
594 let session_id = Uuid::new_v4();
595 let on_question = bridge.make_on_question(session_id);
596
597 let request = make_question_request();
598 let handle = tokio::spawn(async move { on_question(request).await });
599
600 let msg = rx.recv().await.expect("should receive QuestionNeeded");
602 let interaction_id = match msg {
603 OutboundMessage::QuestionNeeded { interaction_id, .. } => interaction_id,
604 other => panic!("expected QuestionNeeded, got: {other:?}"),
605 };
606
607 let response = QuestionResponse {
609 answers: vec![vec!["Red".into()]],
610 };
611 bridge
612 .resolve_question(interaction_id, response)
613 .expect("resolve should succeed");
614
615 let result = handle.await.expect("task should complete");
617 let resp = result.expect("should be Ok");
618 assert_eq!(resp.answers, vec![vec!["Red".to_string()]]);
619 }
620
621 #[tokio::test]
624 async fn question_timeout_returns_error() {
625 let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
626 let session_id = Uuid::new_v4();
627 let on_question = bridge.make_on_question(session_id);
628
629 let request = make_question_request();
630 let handle = tokio::spawn(async move { on_question(request).await });
631
632 let _msg = rx.recv().await.expect("should receive QuestionNeeded");
634
635 let result = handle.await.expect("task should complete");
637 let err = result.expect_err("should be Err");
638 assert!(
639 err.to_string().contains("timed out"),
640 "error should mention timeout, got: {err}"
641 );
642 }
643
644 #[tokio::test]
647 async fn resolve_unknown_id_returns_error() {
648 let (bridge, _rx) = make_bridge(Duration::from_secs(5));
649 let bogus_id = Uuid::new_v4();
650
651 let err = bridge
652 .resolve_input(bogus_id, Some("msg".into()))
653 .expect_err("should error");
654 assert!(
655 err.to_string().contains("no pending interaction"),
656 "got: {err}"
657 );
658 }
659
660 #[tokio::test]
663 async fn resolve_wrong_type_returns_error() {
664 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
665 let session_id = Uuid::new_v4();
666 let on_input = bridge.make_on_input(session_id);
667
668 let _handle = tokio::spawn(async move { on_input().await });
670
671 let msg = rx.recv().await.expect("should receive InputNeeded");
673 let interaction_id = match msg {
674 OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
675 other => panic!("expected InputNeeded, got: {other:?}"),
676 };
677
678 let err = bridge
680 .resolve_approval(interaction_id, ApprovalDecision::Allow)
681 .expect_err("should error on wrong type");
682 assert!(
683 err.to_string().contains("not an approval interaction"),
684 "got: {err}"
685 );
686 }
687
688 #[tokio::test]
691 async fn concurrent_interactions() {
692 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
693 let session_id = Uuid::new_v4();
694
695 let on_input = bridge.make_on_input(session_id);
697 let on_question = bridge.make_on_question(session_id);
698 let on_approval = bridge.make_on_approval(session_id);
699
700 let input_handle = tokio::spawn(async move { on_input().await });
702 let question_handle = {
703 let req = make_question_request();
704 tokio::spawn(async move { on_question(req).await })
705 };
706 let approval_handle = tokio::task::spawn_blocking(move || on_approval(&[]));
707
708 let mut input_id = None;
710 let mut question_id = None;
711 let mut approval_id = None;
712
713 for _ in 0..3 {
714 let msg = rx.recv().await.expect("should receive outbound message");
715 match msg {
716 OutboundMessage::InputNeeded { interaction_id, .. } => {
717 input_id = Some(interaction_id)
718 }
719 OutboundMessage::QuestionNeeded { interaction_id, .. } => {
720 question_id = Some(interaction_id)
721 }
722 OutboundMessage::ApprovalNeeded { interaction_id, .. } => {
723 approval_id = Some(interaction_id)
724 }
725 other => panic!("unexpected outbound message: {other:?}"),
726 }
727 }
728
729 let input_id = input_id.expect("should have received InputNeeded");
730 let question_id = question_id.expect("should have received QuestionNeeded");
731 let approval_id = approval_id.expect("should have received ApprovalNeeded");
732
733 bridge
735 .resolve_question(
736 question_id,
737 QuestionResponse {
738 answers: vec![vec!["Blue".into()]],
739 },
740 )
741 .expect("resolve question");
742 bridge
743 .resolve_approval(approval_id, ApprovalDecision::AlwaysAllow)
744 .expect("resolve approval");
745 bridge
746 .resolve_input(input_id, Some("concurrent input".into()))
747 .expect("resolve input");
748
749 let input_result = input_handle.await.expect("input task");
751 assert_eq!(input_result, Some("concurrent input".into()));
752
753 let question_result = question_handle.await.expect("question task");
754 let resp = question_result.expect("question should be Ok");
755 assert_eq!(resp.answers, vec![vec!["Blue".to_string()]]);
756
757 let approval_result = approval_handle.await.expect("approval task");
758 assert_eq!(approval_result, ApprovalDecision::AlwaysAllow);
759 }
760
761 #[tokio::test]
764 async fn text_delta_multiple_sends() {
765 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
766 let session_id = Uuid::new_v4();
767 let on_text = bridge.make_on_text(session_id);
768
769 on_text("chunk1");
770 on_text("chunk2");
771 on_text("chunk3");
772
773 for expected in &["chunk1", "chunk2", "chunk3"] {
774 let msg = rx.recv().await.expect("should receive message");
775 match msg {
776 OutboundMessage::TextDelta { text, .. } => assert_eq!(text, *expected),
777 other => panic!("expected TextDelta, got: {other:?}"),
778 }
779 }
780 }
781
782 #[tokio::test]
783 async fn input_resolve_with_none_ends_session() {
784 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
785 let session_id = Uuid::new_v4();
786 let on_input = bridge.make_on_input(session_id);
787
788 let handle = tokio::spawn(async move { on_input().await });
789
790 let msg = rx.recv().await.expect("should receive InputNeeded");
791 let interaction_id = match msg {
792 OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
793 other => panic!("expected InputNeeded, got: {other:?}"),
794 };
795
796 bridge
798 .resolve_input(interaction_id, None)
799 .expect("resolve should succeed");
800
801 let result = handle.await.expect("task should complete");
802 assert_eq!(result, None);
803 }
804
805 #[tokio::test]
806 async fn approval_needed_includes_tool_calls_json() {
807 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
808 let session_id = Uuid::new_v4();
809 let on_approval = bridge.make_on_approval(session_id);
810
811 let tool_calls = vec![crate::llm::types::ToolCall {
812 id: "call-1".into(),
813 name: "bash".into(),
814 input: serde_json::json!({"command": "ls"}),
815 }];
816
817 let tool_calls_for_thread = tool_calls.clone();
818 let handle = tokio::task::spawn_blocking(move || on_approval(&tool_calls_for_thread));
819
820 let msg = rx.recv().await.expect("should receive ApprovalNeeded");
821 match &msg {
822 OutboundMessage::ApprovalNeeded {
823 tool_calls: tc_json,
824 interaction_id,
825 ..
826 } => {
827 assert!(tc_json.is_array());
829 assert_eq!(tc_json[0]["name"], "bash");
830 assert_eq!(tc_json[0]["input"]["command"], "ls");
831
832 bridge
834 .resolve_approval(*interaction_id, ApprovalDecision::Deny)
835 .expect("resolve");
836 }
837 other => panic!("expected ApprovalNeeded, got: {other:?}"),
838 }
839
840 let result = handle.await.expect("task should complete");
841 assert_eq!(result, ApprovalDecision::Deny);
842 }
843
844 #[tokio::test]
845 async fn question_needed_includes_request() {
846 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
847 let session_id = Uuid::new_v4();
848 let on_question = bridge.make_on_question(session_id);
849
850 let request = make_question_request();
851 let handle = tokio::spawn(async move { on_question(request).await });
852
853 let msg = rx.recv().await.expect("should receive QuestionNeeded");
854 match &msg {
855 OutboundMessage::QuestionNeeded {
856 request,
857 interaction_id,
858 ..
859 } => {
860 assert_eq!(request.questions.len(), 1);
861 assert_eq!(request.questions[0].question, "Pick a color");
862 assert_eq!(request.questions[0].options.len(), 2);
863
864 bridge
865 .resolve_question(
866 *interaction_id,
867 QuestionResponse {
868 answers: vec![vec!["Blue".into()]],
869 },
870 )
871 .expect("resolve");
872 }
873 other => panic!("expected QuestionNeeded, got: {other:?}"),
874 }
875
876 let result = handle.await.expect("task should complete");
877 assert!(result.is_ok());
878 }
879
880 #[tokio::test]
881 async fn question_channel_closed_returns_error() {
882 let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
883 let session_id = Uuid::new_v4();
884 let on_question = bridge.make_on_question(session_id);
885
886 let request = make_question_request();
887 let handle = tokio::spawn(async move { on_question(request).await });
888
889 let msg = rx.recv().await.expect("should receive QuestionNeeded");
891 let interaction_id = match msg {
892 OutboundMessage::QuestionNeeded { interaction_id, .. } => interaction_id,
893 other => panic!("expected QuestionNeeded, got: {other:?}"),
894 };
895
896 {
898 let mut pending = bridge.pending.write().expect("lock");
899 pending.remove(&interaction_id);
900 }
902
903 let result = handle.await.expect("task should complete");
904 let err = result.expect_err("should be Err");
905 assert!(err.to_string().contains("channel closed"), "got: {err}");
906 }
907
908 #[tokio::test]
909 async fn resolve_approval_after_timeout_grace_period() {
910 let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
911 let session_id = Uuid::new_v4();
912 let on_approval = bridge.make_on_approval(session_id);
913
914 let handle = tokio::task::spawn_blocking(move || on_approval(&[]));
915
916 let msg = rx.recv().await.expect("should receive ApprovalNeeded");
917 let interaction_id = match msg {
918 OutboundMessage::ApprovalNeeded { interaction_id, .. } => interaction_id,
919 other => panic!("expected ApprovalNeeded, got: {other:?}"),
920 };
921
922 let result = handle.await.expect("task should complete");
924 assert_eq!(result, ApprovalDecision::Deny);
925
926 bridge
928 .resolve_approval(interaction_id, ApprovalDecision::Allow)
929 .expect("late resolve during grace period should succeed");
930
931 let err = bridge
933 .resolve_approval(interaction_id, ApprovalDecision::Allow)
934 .expect_err("should error after entry consumed");
935 assert!(
936 err.to_string().contains("no pending interaction"),
937 "got: {err}"
938 );
939 }
940
941 #[tokio::test]
942 async fn resolve_question_after_timeout_grace_period() {
943 let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
944 let session_id = Uuid::new_v4();
945 let on_question = bridge.make_on_question(session_id);
946
947 let request = make_question_request();
948 let handle = tokio::spawn(async move { on_question(request).await });
949
950 let msg = rx.recv().await.expect("should receive QuestionNeeded");
951 let interaction_id = match msg {
952 OutboundMessage::QuestionNeeded { interaction_id, .. } => interaction_id,
953 other => panic!("expected QuestionNeeded, got: {other:?}"),
954 };
955
956 let result = handle.await.expect("task should complete");
958 let err = result.expect_err("should be Err");
959 assert!(err.to_string().contains("timed out"), "got: {err}");
960
961 bridge
963 .resolve_question(
964 interaction_id,
965 QuestionResponse {
966 answers: vec![vec!["too late".into()]],
967 },
968 )
969 .expect("late resolve during grace period should succeed");
970
971 let err = bridge
973 .resolve_question(
974 interaction_id,
975 QuestionResponse {
976 answers: vec![vec!["really late".into()]],
977 },
978 )
979 .expect_err("should error after entry consumed");
980 assert!(
981 err.to_string().contains("no pending interaction"),
982 "got: {err}"
983 );
984 }
985
986 #[tokio::test]
987 async fn resolve_input_after_timeout_grace_period() {
988 let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
989 let session_id = Uuid::new_v4();
990 let on_input = bridge.make_on_input(session_id);
991
992 let handle = tokio::spawn(async move { on_input().await });
993
994 let msg = rx.recv().await.expect("should receive InputNeeded");
995 let interaction_id = match msg {
996 OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
997 other => panic!("expected InputNeeded, got: {other:?}"),
998 };
999
1000 let result = handle.await.expect("task should complete");
1002 assert_eq!(result, None);
1003
1004 bridge
1008 .resolve_input(interaction_id, Some("too late".into()))
1009 .expect("late resolve during grace period should succeed");
1010
1011 let err = bridge
1013 .resolve_input(interaction_id, Some("really late".into()))
1014 .expect_err("should error after entry consumed");
1015 assert!(
1016 err.to_string().contains("no pending interaction"),
1017 "got: {err}"
1018 );
1019 }
1020}