Skip to main content

heartbit_core/channel/
bridge.rs

1//! In-process channel bridge connecting the HTTP layer to the agent runtime.
2
3#![allow(missing_docs)]
4use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6use std::time::Duration;
7
8use uuid::Uuid;
9
10use crate::agent::OnInput;
11use crate::agent::events::{AgentEvent, OnEvent};
12use crate::error::Error;
13use crate::llm::{ApprovalDecision, OnApproval, OnText};
14use crate::tool::builtins::{OnQuestion, QuestionRequest, QuestionResponse};
15
16/// Messages sent from bridge to the WS handler for forwarding to the client.
17#[derive(Debug, Clone)]
18pub enum OutboundMessage {
19    TextDelta {
20        session_id: Uuid,
21        text: String,
22    },
23    AgentEvent {
24        session_id: Uuid,
25        event: AgentEvent,
26    },
27    InputNeeded {
28        session_id: Uuid,
29        interaction_id: Uuid,
30    },
31    ApprovalNeeded {
32        session_id: Uuid,
33        interaction_id: Uuid,
34        tool_calls: serde_json::Value,
35    },
36    QuestionNeeded {
37        session_id: Uuid,
38        interaction_id: Uuid,
39        request: QuestionRequest,
40    },
41    /// Agent run completed successfully.
42    ChatFinal {
43        session_id: Uuid,
44        result: String,
45    },
46    /// Agent run failed with an error.
47    ChatError {
48        session_id: Uuid,
49        error: String,
50    },
51    /// Pre-built WS frame (e.g., method responses) to send as-is.
52    RawFrame(crate::channel::types::WsFrame),
53}
54
55/// Sender half for pending interactions, paired with the session_id that
56/// owns the interaction.
57///
58/// SECURITY (F-AUTH-5): the session_id is stored alongside the sender so
59/// `resolve_*_for_session` can verify the resolving client owns the
60/// interaction. UUID v4 (122 bits) makes guessing infeasible, but if the
61/// id ever leaks (logs, metrics, debug output), the session check provides
62/// a second line of defense.
63struct PendingEntry {
64    session_id: Uuid,
65    sender: PendingSender,
66}
67
68enum PendingSender {
69    Input(tokio::sync::oneshot::Sender<Option<String>>),
70    Approval(std::sync::mpsc::Sender<ApprovalDecision>),
71    Question(tokio::sync::oneshot::Sender<Result<QuestionResponse, Error>>),
72}
73
74/// Grace period after timeout before cleaning up a pending interaction entry.
75/// During this window, a late resolve from the client succeeds silently
76/// (the oneshot receiver is already dropped, but the resolve call returns Ok).
77const GRACE_PERIOD: Duration = Duration::from_secs(15);
78
79/// The interaction bridge translating heartbit callbacks to WS frames.
80///
81/// Maintains a map of pending interactions and an outbound channel for
82/// pushing events to the WebSocket handler.
83pub struct InteractionBridge {
84    pending: RwLock<HashMap<Uuid, PendingEntry>>,
85    outbound: tokio::sync::mpsc::Sender<OutboundMessage>,
86    timeout: Duration,
87}
88
89impl InteractionBridge {
90    /// Create a new bridge with the given outbound sender and interaction timeout.
91    pub fn new(outbound: tokio::sync::mpsc::Sender<OutboundMessage>, timeout: Duration) -> Self {
92        Self {
93            pending: RwLock::new(HashMap::new()),
94            outbound,
95            timeout,
96        }
97    }
98
99    /// Create an `OnText` callback that forwards text deltas to the outbound channel.
100    pub fn make_on_text(self: &Arc<Self>, session_id: Uuid) -> Arc<OnText> {
101        let outbound = self.outbound.clone();
102        Arc::new(move |text: &str| {
103            let _ = outbound.try_send(OutboundMessage::TextDelta {
104                session_id,
105                text: text.to_string(),
106            });
107        })
108    }
109
110    /// Create an `OnEvent` callback that forwards agent events to the outbound channel.
111    pub fn make_on_event(self: &Arc<Self>, session_id: Uuid) -> Arc<OnEvent> {
112        let outbound = self.outbound.clone();
113        Arc::new(move |event: AgentEvent| {
114            let _ = outbound.try_send(OutboundMessage::AgentEvent { session_id, event });
115        })
116    }
117
118    /// Create an `OnInput` callback using oneshot rendezvous.
119    ///
120    /// When called, sends `InputNeeded` on the outbound channel and waits
121    /// for the client to resolve. Returns `None` on timeout (SOLICITATION semantic).
122    pub fn make_on_input(self: &Arc<Self>, session_id: Uuid) -> Arc<OnInput> {
123        let bridge = Arc::clone(self);
124        Arc::new(move || {
125            let bridge = Arc::clone(&bridge);
126            Box::pin(async move {
127                let interaction_id = Uuid::new_v4();
128                let (tx, rx) = tokio::sync::oneshot::channel();
129
130                // Store the sender
131                {
132                    let mut pending = bridge.pending.write().expect("pending lock not poisoned");
133                    pending.insert(
134                        interaction_id,
135                        PendingEntry {
136                            session_id,
137                            sender: PendingSender::Input(tx),
138                        },
139                    );
140                }
141
142                // Send outbound event
143                let _ = bridge.outbound.try_send(OutboundMessage::InputNeeded {
144                    session_id,
145                    interaction_id,
146                });
147
148                // Await with timeout
149                match tokio::time::timeout(bridge.timeout, rx).await {
150                    Ok(Ok(msg)) => msg,
151                    _ => {
152                        // Timeout or channel closed -> deferred cleanup (grace period
153                        // allows late resolves from the client to succeed silently).
154                        let cleanup_bridge = Arc::clone(&bridge);
155                        tokio::spawn(async move {
156                            tokio::time::sleep(GRACE_PERIOD).await;
157                            cleanup_bridge.cleanup_pending(interaction_id);
158                        });
159                        None
160                    }
161                }
162            })
163        })
164    }
165
166    /// Create an `OnApproval` callback using `std::sync::mpsc` rendezvous.
167    ///
168    /// `OnApproval` is synchronous, so we use `std::sync::mpsc::recv_timeout()`
169    /// instead of async oneshot. Returns `Deny` on timeout (PERMISSION semantic).
170    pub fn make_on_approval(self: &Arc<Self>, session_id: Uuid) -> Arc<OnApproval> {
171        let bridge = Arc::clone(self);
172        Arc::new(move |tool_calls: &[crate::llm::types::ToolCall]| {
173            let interaction_id = Uuid::new_v4();
174            let (tx, rx) = std::sync::mpsc::channel();
175
176            // Serialize tool calls for the outbound message
177            let tool_calls_json = serde_json::to_value(tool_calls).unwrap_or_default();
178
179            // Store the sender
180            {
181                let mut pending = bridge.pending.write().expect("pending lock not poisoned");
182                pending.insert(
183                    interaction_id,
184                    PendingEntry {
185                        session_id,
186                        sender: PendingSender::Approval(tx),
187                    },
188                );
189            }
190
191            // Send outbound event (non-blocking try_send)
192            let _ = bridge.outbound.try_send(OutboundMessage::ApprovalNeeded {
193                session_id,
194                interaction_id,
195                tool_calls: tool_calls_json,
196            });
197
198            // Block on recv_timeout
199            match rx.recv_timeout(bridge.timeout) {
200                Ok(decision) => decision,
201                Err(_) => {
202                    // Timeout or disconnected -> deny (safe default).
203                    // Deferred cleanup: grace period allows late client resolves.
204                    // Note: we're in a sync context so we spawn via Handle.
205                    let cleanup_bridge = Arc::clone(&bridge);
206                    if let Ok(handle) = tokio::runtime::Handle::try_current() {
207                        handle.spawn(async move {
208                            tokio::time::sleep(GRACE_PERIOD).await;
209                            cleanup_bridge.cleanup_pending(interaction_id);
210                        });
211                    } else {
212                        bridge.cleanup_pending(interaction_id);
213                    }
214                    ApprovalDecision::Deny
215                }
216            }
217        })
218    }
219
220    /// Create an `OnQuestion` callback using oneshot rendezvous.
221    ///
222    /// Returns `Error::Channel("timeout")` on timeout (CLARIFICATION semantic).
223    pub fn make_on_question(self: &Arc<Self>, session_id: Uuid) -> Arc<OnQuestion> {
224        let bridge = Arc::clone(self);
225        Arc::new(move |request: QuestionRequest| {
226            let bridge = Arc::clone(&bridge);
227            let request_clone = request.clone();
228            Box::pin(async move {
229                let interaction_id = Uuid::new_v4();
230                let (tx, rx) = tokio::sync::oneshot::channel();
231
232                // Store the sender
233                {
234                    let mut pending = bridge.pending.write().expect("pending lock not poisoned");
235                    pending.insert(
236                        interaction_id,
237                        PendingEntry {
238                            session_id,
239                            sender: PendingSender::Question(tx),
240                        },
241                    );
242                }
243
244                // Send outbound event
245                let _ = bridge.outbound.try_send(OutboundMessage::QuestionNeeded {
246                    session_id,
247                    interaction_id,
248                    request: request_clone,
249                });
250
251                // Await with timeout
252                match tokio::time::timeout(bridge.timeout, rx).await {
253                    Ok(Ok(result)) => result,
254                    Ok(Err(_)) => {
255                        // Channel closed — deferred cleanup with grace period.
256                        let cleanup_bridge = Arc::clone(&bridge);
257                        tokio::spawn(async move {
258                            tokio::time::sleep(GRACE_PERIOD).await;
259                            cleanup_bridge.cleanup_pending(interaction_id);
260                        });
261                        Err(Error::Channel("interaction channel closed".into()))
262                    }
263                    Err(_) => {
264                        // Timeout — deferred cleanup with grace period.
265                        let cleanup_bridge = Arc::clone(&bridge);
266                        tokio::spawn(async move {
267                            tokio::time::sleep(GRACE_PERIOD).await;
268                            cleanup_bridge.cleanup_pending(interaction_id);
269                        });
270                        Err(Error::Channel("interaction timed out".into()))
271                    }
272                }
273            })
274        })
275    }
276
277    // --- Resolve methods ---
278
279    /// Resolve a pending input interaction.
280    ///
281    /// SECURITY (F-AUTH-5): when `expected_session` is `Some`, the
282    /// resolver checks that the pending entry belongs to that session
283    /// before delivering the result. Use this in network handlers where
284    /// the resolving frame's session id is known.
285    pub fn resolve_input(&self, id: Uuid, message: Option<String>) -> Result<(), Error> {
286        self.resolve_input_for_session(None, id, message)
287    }
288
289    /// Like [`resolve_input`] but verifies the interaction belongs to
290    /// `expected_session` before resolving (F-AUTH-5).
291    pub fn resolve_input_for_session(
292        &self,
293        expected_session: Option<Uuid>,
294        id: Uuid,
295        message: Option<String>,
296    ) -> Result<(), Error> {
297        let entry = self.take_pending(id, expected_session)?;
298        match entry.sender {
299            PendingSender::Input(tx) => {
300                let _ = tx.send(message);
301                Ok(())
302            }
303            other => {
304                drop(other);
305                Err(Error::Channel(format!(
306                    "interaction {id} is not an input interaction"
307                )))
308            }
309        }
310    }
311
312    /// Resolve a pending approval interaction.
313    pub fn resolve_approval(&self, id: Uuid, decision: ApprovalDecision) -> Result<(), Error> {
314        self.resolve_approval_for_session(None, id, decision)
315    }
316
317    /// Like [`resolve_approval`] but verifies the interaction belongs to
318    /// `expected_session` (F-AUTH-5).
319    pub fn resolve_approval_for_session(
320        &self,
321        expected_session: Option<Uuid>,
322        id: Uuid,
323        decision: ApprovalDecision,
324    ) -> Result<(), Error> {
325        let entry = self.take_pending(id, expected_session)?;
326        match entry.sender {
327            PendingSender::Approval(tx) => {
328                let _ = tx.send(decision);
329                Ok(())
330            }
331            other => {
332                drop(other);
333                Err(Error::Channel(format!(
334                    "interaction {id} is not an approval interaction"
335                )))
336            }
337        }
338    }
339
340    /// Resolve a pending question interaction.
341    pub fn resolve_question(&self, id: Uuid, response: QuestionResponse) -> Result<(), Error> {
342        self.resolve_question_for_session(None, id, response)
343    }
344
345    /// Like [`resolve_question`] but verifies the interaction belongs to
346    /// `expected_session` (F-AUTH-5).
347    pub fn resolve_question_for_session(
348        &self,
349        expected_session: Option<Uuid>,
350        id: Uuid,
351        response: QuestionResponse,
352    ) -> Result<(), Error> {
353        let entry = self.take_pending(id, expected_session)?;
354        match entry.sender {
355            PendingSender::Question(tx) => {
356                let _ = tx.send(Ok(response));
357                Ok(())
358            }
359            other => {
360                drop(other);
361                Err(Error::Channel(format!(
362                    "interaction {id} is not a question interaction"
363                )))
364            }
365        }
366    }
367
368    /// Remove and return a pending interaction, or error if not found.
369    /// When `expected_session` is `Some`, also enforces session ownership
370    /// (F-AUTH-5). The entry is consumed even on session mismatch — this
371    /// is intentional: a mismatched resolve aborts the pending interaction
372    /// to prevent replay.
373    fn take_pending(
374        &self,
375        id: Uuid,
376        expected_session: Option<Uuid>,
377    ) -> Result<PendingEntry, Error> {
378        let mut pending = self
379            .pending
380            .write()
381            .map_err(|e| Error::Channel(format!("lock poisoned: {e}")))?;
382        let entry = pending
383            .remove(&id)
384            .ok_or_else(|| Error::Channel(format!("no pending interaction with id {id}")))?;
385        if let Some(expected) = expected_session
386            && entry.session_id != expected
387        {
388            return Err(Error::Channel(format!(
389                "interaction {id} does not belong to session {expected} (F-AUTH-5)"
390            )));
391        }
392        Ok(entry)
393    }
394
395    /// Remove a pending interaction without error (cleanup after timeout).
396    fn cleanup_pending(&self, id: Uuid) {
397        if let Ok(mut pending) = self.pending.write() {
398            pending.remove(&id);
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    use crate::tool::builtins::{Question, QuestionOption};
408
409    fn make_bridge(
410        timeout: Duration,
411    ) -> (
412        Arc<InteractionBridge>,
413        tokio::sync::mpsc::Receiver<OutboundMessage>,
414    ) {
415        let (tx, rx) = tokio::sync::mpsc::channel(16);
416        let bridge = Arc::new(InteractionBridge::new(tx, timeout));
417        (bridge, rx)
418    }
419
420    fn make_question_request() -> QuestionRequest {
421        QuestionRequest {
422            questions: vec![Question {
423                question: "Pick a color".into(),
424                header: "Color".into(),
425                options: vec![
426                    QuestionOption {
427                        label: "Red".into(),
428                        description: "A warm color".into(),
429                    },
430                    QuestionOption {
431                        label: "Blue".into(),
432                        description: "A cool color".into(),
433                    },
434                ],
435                multiple: false,
436            }],
437        }
438    }
439
440    // --- 1. text_delta_forwarded ---
441
442    #[tokio::test]
443    async fn text_delta_forwarded() {
444        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
445        let session_id = Uuid::new_v4();
446        let on_text = bridge.make_on_text(session_id);
447
448        on_text("hello world");
449
450        let msg = rx.recv().await.expect("should receive outbound message");
451        match msg {
452            OutboundMessage::TextDelta {
453                session_id: sid,
454                text,
455            } => {
456                assert_eq!(sid, session_id);
457                assert_eq!(text, "hello world");
458            }
459            other => panic!("expected TextDelta, got: {other:?}"),
460        }
461    }
462
463    // --- 2. agent_event_forwarded ---
464
465    #[tokio::test]
466    async fn agent_event_forwarded() {
467        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
468        let session_id = Uuid::new_v4();
469        let on_event = bridge.make_on_event(session_id);
470
471        let event = AgentEvent::RunStarted {
472            agent: "test".into(),
473            task: "do stuff".into(),
474        };
475        on_event(event.clone());
476
477        let msg = rx.recv().await.expect("should receive outbound message");
478        match msg {
479            OutboundMessage::AgentEvent {
480                session_id: sid,
481                event: received,
482            } => {
483                assert_eq!(sid, session_id);
484                // Verify the event roundtripped via serde
485                let expected_json = serde_json::to_string(&AgentEvent::RunStarted {
486                    agent: "test".into(),
487                    task: "do stuff".into(),
488                })
489                .expect("serialize");
490                let received_json = serde_json::to_string(&received).expect("serialize");
491                assert_eq!(expected_json, received_json);
492            }
493            other => panic!("expected AgentEvent, got: {other:?}"),
494        }
495    }
496
497    // --- 3. resolve_input_before_timeout ---
498
499    #[tokio::test]
500    async fn resolve_input_before_timeout() {
501        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
502        let session_id = Uuid::new_v4();
503        let on_input = bridge.make_on_input(session_id);
504
505        // Spawn the callback
506        let handle = tokio::spawn(async move { on_input().await });
507
508        // Wait for the outbound message
509        let msg = rx.recv().await.expect("should receive InputNeeded");
510        let interaction_id = match msg {
511            OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
512            other => panic!("expected InputNeeded, got: {other:?}"),
513        };
514
515        // Resolve
516        bridge
517            .resolve_input(interaction_id, Some("hello".into()))
518            .expect("resolve should succeed");
519
520        // Verify
521        let result = handle.await.expect("task should complete");
522        assert_eq!(result, Some("hello".into()));
523    }
524
525    // --- 4. input_timeout_returns_none ---
526
527    #[tokio::test]
528    async fn input_timeout_returns_none() {
529        let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
530        let session_id = Uuid::new_v4();
531        let on_input = bridge.make_on_input(session_id);
532
533        let handle = tokio::spawn(async move { on_input().await });
534
535        // Consume the outbound message but don't resolve
536        let _msg = rx.recv().await.expect("should receive InputNeeded");
537
538        // Verify timeout returns None
539        let result = handle.await.expect("task should complete");
540        assert_eq!(result, None);
541    }
542
543    // --- 5. resolve_approval_before_timeout ---
544
545    #[tokio::test]
546    async fn resolve_approval_before_timeout() {
547        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
548        let session_id = Uuid::new_v4();
549        let on_approval = bridge.make_on_approval(session_id);
550
551        // Spawn on blocking thread (OnApproval is sync + blocks)
552        let handle = tokio::task::spawn_blocking(move || on_approval(&[]));
553
554        // Wait for outbound message
555        let msg = rx.recv().await.expect("should receive ApprovalNeeded");
556        let interaction_id = match msg {
557            OutboundMessage::ApprovalNeeded { interaction_id, .. } => interaction_id,
558            other => panic!("expected ApprovalNeeded, got: {other:?}"),
559        };
560
561        // Resolve
562        bridge
563            .resolve_approval(interaction_id, ApprovalDecision::Allow)
564            .expect("resolve should succeed");
565
566        // Verify
567        let result = handle.await.expect("task should complete");
568        assert_eq!(result, ApprovalDecision::Allow);
569    }
570
571    // --- 6. approval_timeout_returns_deny ---
572
573    #[tokio::test]
574    async fn approval_timeout_returns_deny() {
575        let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
576        let session_id = Uuid::new_v4();
577        let on_approval = bridge.make_on_approval(session_id);
578
579        let handle = tokio::task::spawn_blocking(move || on_approval(&[]));
580
581        // Consume outbound but don't resolve
582        let _msg = rx.recv().await.expect("should receive ApprovalNeeded");
583
584        // Verify timeout returns Deny
585        let result = handle.await.expect("task should complete");
586        assert_eq!(result, ApprovalDecision::Deny);
587    }
588
589    // --- 7. resolve_question_before_timeout ---
590
591    #[tokio::test]
592    async fn resolve_question_before_timeout() {
593        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
594        let session_id = Uuid::new_v4();
595        let on_question = bridge.make_on_question(session_id);
596
597        let request = make_question_request();
598        let handle = tokio::spawn(async move { on_question(request).await });
599
600        // Wait for outbound message
601        let msg = rx.recv().await.expect("should receive QuestionNeeded");
602        let interaction_id = match msg {
603            OutboundMessage::QuestionNeeded { interaction_id, .. } => interaction_id,
604            other => panic!("expected QuestionNeeded, got: {other:?}"),
605        };
606
607        // Resolve
608        let response = QuestionResponse {
609            answers: vec![vec!["Red".into()]],
610        };
611        bridge
612            .resolve_question(interaction_id, response)
613            .expect("resolve should succeed");
614
615        // Verify
616        let result = handle.await.expect("task should complete");
617        let resp = result.expect("should be Ok");
618        assert_eq!(resp.answers, vec![vec!["Red".to_string()]]);
619    }
620
621    // --- 8. question_timeout_returns_error ---
622
623    #[tokio::test]
624    async fn question_timeout_returns_error() {
625        let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
626        let session_id = Uuid::new_v4();
627        let on_question = bridge.make_on_question(session_id);
628
629        let request = make_question_request();
630        let handle = tokio::spawn(async move { on_question(request).await });
631
632        // Consume outbound but don't resolve
633        let _msg = rx.recv().await.expect("should receive QuestionNeeded");
634
635        // Verify timeout returns Err(Channel)
636        let result = handle.await.expect("task should complete");
637        let err = result.expect_err("should be Err");
638        assert!(
639            err.to_string().contains("timed out"),
640            "error should mention timeout, got: {err}"
641        );
642    }
643
644    // --- 9. resolve_unknown_id_returns_error ---
645
646    #[tokio::test]
647    async fn resolve_unknown_id_returns_error() {
648        let (bridge, _rx) = make_bridge(Duration::from_secs(5));
649        let bogus_id = Uuid::new_v4();
650
651        let err = bridge
652            .resolve_input(bogus_id, Some("msg".into()))
653            .expect_err("should error");
654        assert!(
655            err.to_string().contains("no pending interaction"),
656            "got: {err}"
657        );
658    }
659
660    // --- 10. resolve_wrong_type_returns_error ---
661
662    #[tokio::test]
663    async fn resolve_wrong_type_returns_error() {
664        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
665        let session_id = Uuid::new_v4();
666        let on_input = bridge.make_on_input(session_id);
667
668        // Start an input interaction
669        let _handle = tokio::spawn(async move { on_input().await });
670
671        // Get the interaction ID
672        let msg = rx.recv().await.expect("should receive InputNeeded");
673        let interaction_id = match msg {
674            OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
675            other => panic!("expected InputNeeded, got: {other:?}"),
676        };
677
678        // Try resolving as approval (wrong type)
679        let err = bridge
680            .resolve_approval(interaction_id, ApprovalDecision::Allow)
681            .expect_err("should error on wrong type");
682        assert!(
683            err.to_string().contains("not an approval interaction"),
684            "got: {err}"
685        );
686    }
687
688    // --- 11. concurrent_interactions ---
689
690    #[tokio::test]
691    async fn concurrent_interactions() {
692        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
693        let session_id = Uuid::new_v4();
694
695        // Create three different interaction types
696        let on_input = bridge.make_on_input(session_id);
697        let on_question = bridge.make_on_question(session_id);
698        let on_approval = bridge.make_on_approval(session_id);
699
700        // Spawn all three
701        let input_handle = tokio::spawn(async move { on_input().await });
702        let question_handle = {
703            let req = make_question_request();
704            tokio::spawn(async move { on_question(req).await })
705        };
706        let approval_handle = tokio::task::spawn_blocking(move || on_approval(&[]));
707
708        // Collect all three outbound messages (order not guaranteed)
709        let mut input_id = None;
710        let mut question_id = None;
711        let mut approval_id = None;
712
713        for _ in 0..3 {
714            let msg = rx.recv().await.expect("should receive outbound message");
715            match msg {
716                OutboundMessage::InputNeeded { interaction_id, .. } => {
717                    input_id = Some(interaction_id)
718                }
719                OutboundMessage::QuestionNeeded { interaction_id, .. } => {
720                    question_id = Some(interaction_id)
721                }
722                OutboundMessage::ApprovalNeeded { interaction_id, .. } => {
723                    approval_id = Some(interaction_id)
724                }
725                other => panic!("unexpected outbound message: {other:?}"),
726            }
727        }
728
729        let input_id = input_id.expect("should have received InputNeeded");
730        let question_id = question_id.expect("should have received QuestionNeeded");
731        let approval_id = approval_id.expect("should have received ApprovalNeeded");
732
733        // Resolve in reverse order (question, approval, input)
734        bridge
735            .resolve_question(
736                question_id,
737                QuestionResponse {
738                    answers: vec![vec!["Blue".into()]],
739                },
740            )
741            .expect("resolve question");
742        bridge
743            .resolve_approval(approval_id, ApprovalDecision::AlwaysAllow)
744            .expect("resolve approval");
745        bridge
746            .resolve_input(input_id, Some("concurrent input".into()))
747            .expect("resolve input");
748
749        // Verify all three
750        let input_result = input_handle.await.expect("input task");
751        assert_eq!(input_result, Some("concurrent input".into()));
752
753        let question_result = question_handle.await.expect("question task");
754        let resp = question_result.expect("question should be Ok");
755        assert_eq!(resp.answers, vec![vec!["Blue".to_string()]]);
756
757        let approval_result = approval_handle.await.expect("approval task");
758        assert_eq!(approval_result, ApprovalDecision::AlwaysAllow);
759    }
760
761    // --- Additional edge case tests ---
762
763    #[tokio::test]
764    async fn text_delta_multiple_sends() {
765        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
766        let session_id = Uuid::new_v4();
767        let on_text = bridge.make_on_text(session_id);
768
769        on_text("chunk1");
770        on_text("chunk2");
771        on_text("chunk3");
772
773        for expected in &["chunk1", "chunk2", "chunk3"] {
774            let msg = rx.recv().await.expect("should receive message");
775            match msg {
776                OutboundMessage::TextDelta { text, .. } => assert_eq!(text, *expected),
777                other => panic!("expected TextDelta, got: {other:?}"),
778            }
779        }
780    }
781
782    #[tokio::test]
783    async fn input_resolve_with_none_ends_session() {
784        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
785        let session_id = Uuid::new_v4();
786        let on_input = bridge.make_on_input(session_id);
787
788        let handle = tokio::spawn(async move { on_input().await });
789
790        let msg = rx.recv().await.expect("should receive InputNeeded");
791        let interaction_id = match msg {
792            OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
793            other => panic!("expected InputNeeded, got: {other:?}"),
794        };
795
796        // Resolve with None (end session)
797        bridge
798            .resolve_input(interaction_id, None)
799            .expect("resolve should succeed");
800
801        let result = handle.await.expect("task should complete");
802        assert_eq!(result, None);
803    }
804
805    #[tokio::test]
806    async fn approval_needed_includes_tool_calls_json() {
807        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
808        let session_id = Uuid::new_v4();
809        let on_approval = bridge.make_on_approval(session_id);
810
811        let tool_calls = vec![crate::llm::types::ToolCall {
812            id: "call-1".into(),
813            name: "bash".into(),
814            input: serde_json::json!({"command": "ls"}),
815        }];
816
817        let tool_calls_for_thread = tool_calls.clone();
818        let handle = tokio::task::spawn_blocking(move || on_approval(&tool_calls_for_thread));
819
820        let msg = rx.recv().await.expect("should receive ApprovalNeeded");
821        match &msg {
822            OutboundMessage::ApprovalNeeded {
823                tool_calls: tc_json,
824                interaction_id,
825                ..
826            } => {
827                // Verify tool calls are serialized correctly
828                assert!(tc_json.is_array());
829                assert_eq!(tc_json[0]["name"], "bash");
830                assert_eq!(tc_json[0]["input"]["command"], "ls");
831
832                // Resolve to unblock the thread
833                bridge
834                    .resolve_approval(*interaction_id, ApprovalDecision::Deny)
835                    .expect("resolve");
836            }
837            other => panic!("expected ApprovalNeeded, got: {other:?}"),
838        }
839
840        let result = handle.await.expect("task should complete");
841        assert_eq!(result, ApprovalDecision::Deny);
842    }
843
844    #[tokio::test]
845    async fn question_needed_includes_request() {
846        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
847        let session_id = Uuid::new_v4();
848        let on_question = bridge.make_on_question(session_id);
849
850        let request = make_question_request();
851        let handle = tokio::spawn(async move { on_question(request).await });
852
853        let msg = rx.recv().await.expect("should receive QuestionNeeded");
854        match &msg {
855            OutboundMessage::QuestionNeeded {
856                request,
857                interaction_id,
858                ..
859            } => {
860                assert_eq!(request.questions.len(), 1);
861                assert_eq!(request.questions[0].question, "Pick a color");
862                assert_eq!(request.questions[0].options.len(), 2);
863
864                bridge
865                    .resolve_question(
866                        *interaction_id,
867                        QuestionResponse {
868                            answers: vec![vec!["Blue".into()]],
869                        },
870                    )
871                    .expect("resolve");
872            }
873            other => panic!("expected QuestionNeeded, got: {other:?}"),
874        }
875
876        let result = handle.await.expect("task should complete");
877        assert!(result.is_ok());
878    }
879
880    #[tokio::test]
881    async fn question_channel_closed_returns_error() {
882        let (bridge, mut rx) = make_bridge(Duration::from_secs(5));
883        let session_id = Uuid::new_v4();
884        let on_question = bridge.make_on_question(session_id);
885
886        let request = make_question_request();
887        let handle = tokio::spawn(async move { on_question(request).await });
888
889        // Get interaction ID, then manually drop the sender to simulate channel close
890        let msg = rx.recv().await.expect("should receive QuestionNeeded");
891        let interaction_id = match msg {
892            OutboundMessage::QuestionNeeded { interaction_id, .. } => interaction_id,
893            other => panic!("expected QuestionNeeded, got: {other:?}"),
894        };
895
896        // Take the pending sender and drop it (simulates channel close)
897        {
898            let mut pending = bridge.pending.write().expect("lock");
899            pending.remove(&interaction_id);
900            // Sender is dropped here
901        }
902
903        let result = handle.await.expect("task should complete");
904        let err = result.expect_err("should be Err");
905        assert!(err.to_string().contains("channel closed"), "got: {err}");
906    }
907
908    #[tokio::test]
909    async fn resolve_approval_after_timeout_grace_period() {
910        let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
911        let session_id = Uuid::new_v4();
912        let on_approval = bridge.make_on_approval(session_id);
913
914        let handle = tokio::task::spawn_blocking(move || on_approval(&[]));
915
916        let msg = rx.recv().await.expect("should receive ApprovalNeeded");
917        let interaction_id = match msg {
918            OutboundMessage::ApprovalNeeded { interaction_id, .. } => interaction_id,
919            other => panic!("expected ApprovalNeeded, got: {other:?}"),
920        };
921
922        // Wait for timeout
923        let result = handle.await.expect("task should complete");
924        assert_eq!(result, ApprovalDecision::Deny);
925
926        // Late resolve during grace period should succeed (entry still present).
927        bridge
928            .resolve_approval(interaction_id, ApprovalDecision::Allow)
929            .expect("late resolve during grace period should succeed");
930
931        // Second resolve should fail (entry consumed by first resolve)
932        let err = bridge
933            .resolve_approval(interaction_id, ApprovalDecision::Allow)
934            .expect_err("should error after entry consumed");
935        assert!(
936            err.to_string().contains("no pending interaction"),
937            "got: {err}"
938        );
939    }
940
941    #[tokio::test]
942    async fn resolve_question_after_timeout_grace_period() {
943        let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
944        let session_id = Uuid::new_v4();
945        let on_question = bridge.make_on_question(session_id);
946
947        let request = make_question_request();
948        let handle = tokio::spawn(async move { on_question(request).await });
949
950        let msg = rx.recv().await.expect("should receive QuestionNeeded");
951        let interaction_id = match msg {
952            OutboundMessage::QuestionNeeded { interaction_id, .. } => interaction_id,
953            other => panic!("expected QuestionNeeded, got: {other:?}"),
954        };
955
956        // Wait for timeout
957        let result = handle.await.expect("task should complete");
958        let err = result.expect_err("should be Err");
959        assert!(err.to_string().contains("timed out"), "got: {err}");
960
961        // Late resolve during grace period should succeed (entry still present).
962        bridge
963            .resolve_question(
964                interaction_id,
965                QuestionResponse {
966                    answers: vec![vec!["too late".into()]],
967                },
968            )
969            .expect("late resolve during grace period should succeed");
970
971        // Second resolve should fail (entry consumed by first resolve)
972        let err = bridge
973            .resolve_question(
974                interaction_id,
975                QuestionResponse {
976                    answers: vec![vec!["really late".into()]],
977                },
978            )
979            .expect_err("should error after entry consumed");
980        assert!(
981            err.to_string().contains("no pending interaction"),
982            "got: {err}"
983        );
984    }
985
986    #[tokio::test]
987    async fn resolve_input_after_timeout_grace_period() {
988        let (bridge, mut rx) = make_bridge(Duration::from_millis(10));
989        let session_id = Uuid::new_v4();
990        let on_input = bridge.make_on_input(session_id);
991
992        let handle = tokio::spawn(async move { on_input().await });
993
994        let msg = rx.recv().await.expect("should receive InputNeeded");
995        let interaction_id = match msg {
996            OutboundMessage::InputNeeded { interaction_id, .. } => interaction_id,
997            other => panic!("expected InputNeeded, got: {other:?}"),
998        };
999
1000        // Wait for timeout
1001        let result = handle.await.expect("task should complete");
1002        assert_eq!(result, None);
1003
1004        // Late resolve during grace period should succeed (entry still present).
1005        // The oneshot receiver is already dropped, so the send is a no-op, but
1006        // the resolve call itself returns Ok.
1007        bridge
1008            .resolve_input(interaction_id, Some("too late".into()))
1009            .expect("late resolve during grace period should succeed");
1010
1011        // Second resolve should fail (entry consumed by first resolve)
1012        let err = bridge
1013            .resolve_input(interaction_id, Some("really late".into()))
1014            .expect_err("should error after entry consumed");
1015        assert!(
1016            err.to_string().contains("no pending interaction"),
1017            "got: {err}"
1018        );
1019    }
1020}