1use std::collections::{HashMap, HashSet, VecDeque};
15use std::fmt;
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, LazyLock};
20use std::time::{Duration, Instant};
21
22use serde::Deserialize;
23use serde_json::{json, Value};
24
25use crate::config::Config;
26use crate::config_resolve::ConfigTier;
27use crate::context::{App, AppContext, ProgressSender};
28use crate::executor::{Executor, Lane};
29use crate::jsonc::strip_jsonc;
30use crate::log_ctx;
31use crate::path_identity::ProjectRootId;
32use crate::protocol::{ProgressKind, PushFrame, RawRequest, Response};
33use crate::run_tool_call::{run_tool_call, ToolCallContext, ToolCallOutcome, ToolCallResult};
34use crate::runtime_drain;
35
36use subc_protocol::manifest::{
37 Bindings, Concurrency, ExecutionMode, IdentityBinding, IdentityScope, ModuleManifest,
38 ProviderRole, StorageBinding, StorageKind, StorageScope, Tool, TrustTier,
39};
40use subc_protocol::session::{ModuleControlRequest, ModuleControlResponse};
41use subc_protocol::{
42 ErrorBody, Flags, Frame, FrameType, ModuleHelloBody, Principal, Priority, PROTOCOL_VERSION,
43};
44use subc_transport::{authenticate_client, connection_file, read_frame, write_frame};
45use tokio::io::{AsyncRead, AsyncWrite};
46use tokio::net::TcpStream;
47use tokio::sync::{mpsc, oneshot, Notify};
48use tokio::task::JoinHandle;
49
50const AUTH_DEADLINE: Duration = Duration::from_secs(5);
54
55const HELLO_CORR: u64 = 1;
57
58const PUSH_BUFFER_MAX_PER_KEY: usize = 256;
61
62const CONTROL_SEND_TIMEOUT: Duration = Duration::from_millis(250);
66
67const COMPLETED_TASK_SUPPRESSION_MAX: usize = 4096;
70
71const PENDING_POLL_INTERVAL: Duration = Duration::from_millis(100);
75
76type RouteChannel = u32;
77type PushEnvelope = (ProjectRootId, PushFrame);
78type RetryBuffer = HashMap<RouteChannel, VecDeque<(ReplayKey, PushFrame)>>;
79
80#[derive(Clone)]
81struct PushSenders {
82 lossy_tx: mpsc::Sender<PushEnvelope>,
83 reliable_tx: mpsc::UnboundedSender<PushEnvelope>,
84}
85
86#[derive(Clone)]
87struct PersistentCancelSignal {
88 inner: Arc<PersistentCancelInner>,
89}
90
91struct PersistentCancelInner {
92 cancelled: AtomicBool,
93 notify: Notify,
94}
95
96impl PersistentCancelSignal {
97 fn new() -> Self {
98 Self {
99 inner: Arc::new(PersistentCancelInner {
100 cancelled: AtomicBool::new(false),
101 notify: Notify::new(),
102 }),
103 }
104 }
105
106 fn cancel(&self) {
107 if !self.inner.cancelled.swap(true, Ordering::SeqCst) {
108 self.inner.notify.notify_waiters();
109 }
110 }
111
112 fn is_cancelled(&self) -> bool {
113 self.inner.cancelled.load(Ordering::SeqCst)
114 }
115
116 async fn cancelled(&self) {
117 loop {
125 let notified = self.inner.notify.notified();
126 tokio::pin!(notified);
127 notified.as_mut().enable();
128 if self.is_cancelled() {
129 return;
130 }
131 notified.await;
132 }
133 }
134}
135
136#[derive(Clone)]
137struct BashWaitCancel {
138 connection: PersistentCancelSignal,
139 route: PersistentCancelSignal,
140}
141
142impl BashWaitCancel {
143 async fn cancelled(&self) {
144 tokio::select! {
145 _ = self.connection.cancelled() => {}
146 _ = self.route.cancelled() => {}
147 }
148 }
149}
150
151struct RouteBashCancel {
152 token: PersistentCancelSignal,
153 active_waits: usize,
154}
155
156struct BashDeferredCompletion {
157 channel: u16,
158 corr: u64,
159 flags: Flags,
160 ver: u8,
161 root: ProjectRootId,
162 request_id: String,
163 result: Option<ToolCallResult>,
164 fatal: bool,
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub(crate) enum BindTrust {
169 FirstParty,
170 Untrusted,
171}
172
173impl BindTrust {
174 fn allows_bash_observation(self) -> bool {
175 matches!(self, Self::FirstParty)
176 }
177
178 fn label(self) -> &'static str {
179 match self {
180 Self::FirstParty => "first_party",
181 Self::Untrusted => "untrusted",
182 }
183 }
184}
185
186pub(crate) fn trust_for_principal(principal: &Option<Principal>) -> BindTrust {
187 match principal {
188 Some(Principal::Direct) => BindTrust::FirstParty,
189 Some(Principal::Reserved { module_id })
190 if module_id == "llm-runner" || module_id == "aft" =>
191 {
192 BindTrust::FirstParty
193 }
194 Some(Principal::Reserved { .. }) | Some(Principal::Unverified) | None => {
195 BindTrust::Untrusted
196 }
197 }
198}
199
200fn principal_label(principal: &Option<Principal>) -> String {
201 match principal {
202 Some(Principal::Direct) => "direct".to_string(),
203 Some(Principal::Reserved { module_id }) => format!("reserved:{module_id}"),
204 Some(Principal::Unverified) => "unverified".to_string(),
205 None => "absent".to_string(),
206 }
207}
208
209#[derive(Debug)]
210struct RootMeta {
216 maintenance_pending: bool,
217 last_touched: Instant,
218 diagnostics_on_edit: bool,
219 active_bash_waits: usize,
220}
221
222#[derive(Debug)]
223struct PendingBind {
224 bind_root_id: ProjectRootId,
225 inserted_new_actor: bool,
226 cancelled: bool,
227}
228
229struct RouteBindCompletion {
230 route_channel: u16,
231 identity: RouteIdentity,
232 bind_root_id: ProjectRootId,
233 inserted_new_actor: bool,
234 configure_response: Response,
235 drain_response: Option<Response>,
236 diagnostics_on_edit: bool,
237 ver: u8,
238 corr: u64,
239 flags: Flags,
240}
241
242#[derive(Debug, Clone)]
243struct RouteIdentity {
244 root: ProjectRootId,
245 project_root: PathBuf,
246 harness: String,
247 session: String,
248 trust: BindTrust,
249}
250
251#[derive(Debug, Clone)]
252struct RetainedSessionIdentity {
253 harness: String,
254 trust: BindTrust,
255}
256
257#[derive(Clone, Copy)]
258struct BgSub {
259 corr: u64,
260 ver: u8,
261 flags: Flags,
262}
263
264struct MaintenanceCompletion {
265 root_id: ProjectRootId,
266 response: Response,
267 empty_bg_sessions: Vec<(String, u64)>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Hash)]
271struct ReplayKey {
272 root: ProjectRootId,
273 harness: String,
274 session: String,
275}
276
277impl ReplayKey {
278 fn from_identity(identity: &RouteIdentity) -> Self {
279 Self {
280 root: identity.root.clone(),
281 harness: identity.harness.clone(),
282 session: identity.session.clone(),
283 }
284 }
285}
286
287#[derive(Debug, Default)]
288struct CompletedTaskIds {
289 order: VecDeque<String>,
290 set: HashSet<String>,
291}
292
293impl CompletedTaskIds {
294 fn remember(&mut self, task_id: &str) {
295 if self.set.contains(task_id) {
296 return;
297 }
298 if self.order.len() >= COMPLETED_TASK_SUPPRESSION_MAX {
299 if let Some(evicted) = self.order.pop_front() {
300 self.set.remove(&evicted);
301 }
302 }
303 let task_id = task_id.to_string();
304 self.order.push_back(task_id.clone());
305 self.set.insert(task_id);
306 }
307
308 fn contains(&self, task_id: &str) -> bool {
309 self.set.contains(task_id)
310 }
311}
312
313impl RootMeta {
314 fn new(now: Instant) -> Self {
315 Self {
316 maintenance_pending: false,
317 last_touched: now,
318 diagnostics_on_edit: false,
319 active_bash_waits: 0,
320 }
321 }
322
323 fn touch(&mut self) {
324 self.last_touched = Instant::now();
325 }
326}
327
328fn route_key(channel: u16) -> RouteChannel {
329 RouteChannel::from(channel)
330}
331
332fn remove_root_channel(
333 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
334 root: &ProjectRootId,
335 channel: RouteChannel,
336) {
337 let remove_root = if let Some(channels) = root_channels.get_mut(root) {
338 channels.remove(&channel);
339 channels.is_empty()
340 } else {
341 false
342 };
343 if remove_root {
344 root_channels.remove(root);
345 }
346}
347
348fn remove_route_channel(
349 routes: &mut HashMap<RouteChannel, RouteIdentity>,
350 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
351 channel: RouteChannel,
352) -> Option<RouteIdentity> {
353 let removed = routes.remove(&channel);
354 if let Some(identity) = &removed {
355 remove_root_channel(root_channels, &identity.root, channel);
356 }
357 removed
358}
359
360fn insert_route_channel(
361 routes: &mut HashMap<RouteChannel, RouteIdentity>,
362 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
363 channel: RouteChannel,
364 identity: RouteIdentity,
365) {
366 if let Some(previous) = routes.insert(channel, identity.clone()) {
367 remove_root_channel(root_channels, &previous.root, channel);
368 }
369 root_channels
370 .entry(identity.root.clone())
371 .or_default()
372 .insert(channel);
373}
374
375fn remove_bg_subscription_index(
376 bg_sub_by_session: &mut HashMap<(ProjectRootId, String), RouteChannel>,
377 channel: RouteChannel,
378 identity: Option<&RouteIdentity>,
379) {
380 if let Some(identity) = identity {
381 let key = (identity.root.clone(), identity.session.clone());
382 if bg_sub_by_session.get(&key).copied() == Some(channel) {
383 bg_sub_by_session.remove(&key);
384 }
385 } else {
386 bg_sub_by_session.retain(|_, mapped_channel| *mapped_channel != channel);
387 }
388}
389
390fn end_bg_subscription(
391 writer_tx: &mpsc::Sender<Frame>,
392 bg_subs: &mut HashMap<RouteChannel, BgSub>,
393 bg_sub_by_session: &mut HashMap<(ProjectRootId, String), RouteChannel>,
394 bg_wake_pending: &mut HashSet<RouteChannel>,
395 channel: RouteChannel,
396 identity: Option<&RouteIdentity>,
397) {
398 if let Some(sub) = bg_subs.get(&channel).copied() {
399 let _ = try_send_bg_stream_end(writer_tx, channel, &sub);
400 bg_subs.remove(&channel);
401 bg_wake_pending.remove(&channel);
402 remove_bg_subscription_index(bg_sub_by_session, channel, identity);
403 }
404}
405
406fn remember_session_identity(
407 session_identity: &mut HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
408 identity: &RouteIdentity,
409) {
410 let key = (identity.root.clone(), identity.session.clone());
411 if matches!(identity.trust, BindTrust::Untrusted)
412 && session_identity
413 .get(&key)
414 .is_some_and(|retained| matches!(retained.trust, BindTrust::FirstParty))
415 {
416 return;
417 }
418
419 session_identity.insert(
424 key,
425 RetainedSessionIdentity {
426 harness: identity.harness.clone(),
427 trust: identity.trust,
428 },
429 );
430}
431
432fn replay_key_for_session(
433 session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
434 root: &ProjectRootId,
435 session: &str,
436) -> Option<(ReplayKey, BindTrust)> {
437 let retained = session_identity.get(&(root.clone(), session.to_string()))?;
438 Some((
439 ReplayKey {
440 root: root.clone(),
441 harness: retained.harness.clone(),
442 session: session.to_string(),
443 },
444 retained.trust,
445 ))
446}
447
448fn frame_session(frame: &PushFrame) -> Option<&str> {
449 match frame {
450 PushFrame::BashCompleted(completed) => Some(completed.session_id.as_str()),
451 PushFrame::BashLongRunning(long_running) => Some(long_running.session_id.as_str()),
452 PushFrame::BashPatternMatch(pattern_match) => Some(pattern_match.session_id.as_str()),
453 PushFrame::ConfigureWarnings(warnings) => warnings.session_id.as_deref(),
454 PushFrame::StatusChanged(status) => status.session_id.as_deref(),
455 PushFrame::Progress(_) => None,
456 }
457}
458
459fn frame_is_reliable(frame: &PushFrame) -> bool {
460 matches!(
461 frame,
462 PushFrame::BashCompleted(_)
463 | PushFrame::BashPatternMatch(_)
464 | PushFrame::ConfigureWarnings(_)
465 )
466}
467
468fn frame_is_bash_observation(frame: &PushFrame) -> bool {
469 matches!(
470 frame,
471 PushFrame::BashCompleted(_)
472 | PushFrame::BashLongRunning(_)
473 | PushFrame::BashPatternMatch(_)
474 )
475}
476
477fn completed_task_id(frame: &PushFrame) -> Option<&str> {
478 match frame {
479 PushFrame::BashCompleted(completed) => Some(completed.task_id.as_str()),
480 _ => None,
481 }
482}
483
484fn completed_bg_session_key(
485 root: &ProjectRootId,
486 frame: &PushFrame,
487) -> Option<(ProjectRootId, String)> {
488 match frame {
489 PushFrame::BashCompleted(completed) => Some((root.clone(), completed.session_id.clone())),
490 _ => None,
491 }
492}
493
494fn long_running_task_id(frame: &PushFrame) -> Option<&str> {
495 match frame {
496 PushFrame::BashLongRunning(long_running) => Some(long_running.task_id.as_str()),
497 _ => None,
498 }
499}
500
501fn should_drop_lossy_push(completed_tasks: &CompletedTaskIds, frame: &PushFrame) -> bool {
502 long_running_task_id(frame).is_some_and(|task_id| completed_tasks.contains(task_id))
503}
504
505fn progress_sender_for_root(push_senders: PushSenders, root_id: ProjectRootId) -> ProgressSender {
506 Arc::new(Box::new(move |frame: PushFrame| {
507 if frame_is_reliable(&frame) {
512 let _ = push_senders.reliable_tx.send((root_id.clone(), frame));
513 } else {
514 let _ = push_senders.lossy_tx.try_send((root_id.clone(), frame));
515 }
516 }))
517}
518
519#[derive(Debug, Clone, PartialEq, Eq, Hash)]
520enum LossyProgressKind {
521 Stdout,
522 Stderr,
523}
524
525impl From<&ProgressKind> for LossyProgressKind {
526 fn from(kind: &ProgressKind) -> Self {
527 match kind {
528 ProgressKind::Stdout => Self::Stdout,
529 ProgressKind::Stderr => Self::Stderr,
530 }
531 }
532}
533
534#[derive(Debug, Clone, PartialEq, Eq, Hash)]
535enum LossyPushKey {
536 Progress {
537 request_id: String,
538 kind: LossyProgressKind,
539 },
540 StatusChanged,
541 BashLongRunning {
542 task_id: String,
543 },
544}
545
546fn lossy_push_key(frame: &PushFrame) -> Option<LossyPushKey> {
547 match frame {
548 PushFrame::Progress(progress) => Some(LossyPushKey::Progress {
549 request_id: progress.request_id.clone(),
550 kind: LossyProgressKind::from(&progress.kind),
551 }),
552 PushFrame::StatusChanged(_) => Some(LossyPushKey::StatusChanged),
553 PushFrame::BashLongRunning(long_running) => Some(LossyPushKey::BashLongRunning {
554 task_id: long_running.task_id.clone(),
555 }),
556 PushFrame::BashCompleted(_)
557 | PushFrame::BashPatternMatch(_)
558 | PushFrame::ConfigureWarnings(_) => None,
559 }
560}
561
562fn coalesce_push_batch(batch: Vec<(ProjectRootId, PushFrame)>) -> Vec<(ProjectRootId, PushFrame)> {
563 let mut slots: Vec<Option<(ProjectRootId, PushFrame)>> = Vec::with_capacity(batch.len());
564 let mut latest_lossy: HashMap<(ProjectRootId, LossyPushKey), usize> = HashMap::new();
565
566 for (root, frame) in batch {
567 if let Some(lossy_key) = lossy_push_key(&frame) {
568 let map_key = (root.clone(), lossy_key);
569 if let Some(previous_index) = latest_lossy.insert(map_key, slots.len()) {
570 slots[previous_index] = None;
571 }
572 }
573 slots.push(Some((root, frame)));
574 }
575
576 slots.into_iter().flatten().collect()
577}
578
579#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
580struct FanOutResult {
581 matched_channels: usize,
585 sent_frames: usize,
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
591enum PushSendOutcome {
592 Sent,
593 Backpressure,
594 PermanentFailure,
595}
596
597fn try_send_push_body(
598 writer_tx: &mpsc::Sender<Frame>,
599 channel: RouteChannel,
600 body: &[u8],
601) -> PushSendOutcome {
602 let Ok(route_channel) = u16::try_from(channel) else {
603 log::warn!("subc attach: invalid route channel {channel} for Push fan-out");
604 return PushSendOutcome::PermanentFailure;
605 };
606 let push_frame = match Frame::build_with_version(
607 PROTOCOL_VERSION,
608 FrameType::Push,
609 control_flags(),
610 route_channel,
611 0,
612 body.to_vec(),
613 ) {
614 Ok(frame) => frame,
615 Err(error) => {
616 log::warn!("subc attach: failed to build Push frame: {error}");
617 return PushSendOutcome::PermanentFailure;
618 }
619 };
620 match writer_tx.try_send(push_frame) {
621 Ok(()) => PushSendOutcome::Sent,
622 Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
623 Err(mpsc::error::TrySendError::Closed(_)) => {
624 log::warn!("subc attach: writer closed while sending Push frame");
625 PushSendOutcome::PermanentFailure
626 }
627 }
628}
629
630fn try_send_push_frame(
631 writer_tx: &mpsc::Sender<Frame>,
632 channel: RouteChannel,
633 frame: &PushFrame,
634) -> PushSendOutcome {
635 let body = match serde_json::to_vec(frame) {
636 Ok(body) => body,
637 Err(error) => {
638 log::warn!("subc attach: failed to serialize PushFrame: {error}");
639 return PushSendOutcome::PermanentFailure;
640 }
641 };
642 try_send_push_body(writer_tx, channel, &body)
643}
644
645fn try_send_bg_stream_frame(
646 writer_tx: &mpsc::Sender<Frame>,
647 channel: RouteChannel,
648 sub: &BgSub,
649 ty: FrameType,
650 body: Vec<u8>,
651) -> PushSendOutcome {
652 let Ok(route_channel) = u16::try_from(channel) else {
653 log::warn!("subc attach: invalid route channel {channel} for bg_events stream");
654 return PushSendOutcome::PermanentFailure;
655 };
656 let frame =
657 match Frame::build_with_version(sub.ver, ty, sub.flags, route_channel, sub.corr, body) {
658 Ok(frame) => frame,
659 Err(error) => {
660 log::warn!("subc attach: failed to build bg_events stream frame: {error}");
661 return PushSendOutcome::PermanentFailure;
662 }
663 };
664 match writer_tx.try_send(frame) {
665 Ok(()) => PushSendOutcome::Sent,
666 Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
667 Err(mpsc::error::TrySendError::Closed(_)) => {
668 log::warn!("subc attach: writer closed while sending bg_events stream frame");
669 PushSendOutcome::PermanentFailure
670 }
671 }
672}
673
674fn try_send_bg_stream_data(
675 writer_tx: &mpsc::Sender<Frame>,
676 channel: RouteChannel,
677 sub: &BgSub,
678) -> PushSendOutcome {
679 let body = match serde_json::to_vec(&json!({ "op": "bg_events" })) {
680 Ok(body) => body,
681 Err(error) => {
682 log::warn!("subc attach: failed to serialize bg_events stream payload: {error}");
683 return PushSendOutcome::PermanentFailure;
684 }
685 };
686 try_send_bg_stream_frame(writer_tx, channel, sub, FrameType::StreamData, body)
687}
688
689fn try_send_bg_stream_end(
690 writer_tx: &mpsc::Sender<Frame>,
691 channel: RouteChannel,
692 sub: &BgSub,
693) -> PushSendOutcome {
694 try_send_bg_stream_frame(writer_tx, channel, sub, FrameType::StreamEnd, Vec::new())
695}
696
697fn emit_bg_event_wakes(
698 writer_tx: &mpsc::Sender<Frame>,
699 bg_subs: &HashMap<RouteChannel, BgSub>,
700 bg_wake_pending: &mut HashSet<RouteChannel>,
701) {
702 let pending_channels: Vec<RouteChannel> = bg_wake_pending.iter().copied().collect();
703 let mut stale_channels = Vec::new();
704 for channel in pending_channels {
705 if let Some(sub) = bg_subs.get(&channel) {
706 let _ = try_send_bg_stream_data(writer_tx, channel, sub);
707 } else {
708 stale_channels.push(channel);
709 }
710 }
711 for channel in stale_channels {
712 bg_wake_pending.remove(&channel);
713 }
714}
715
716fn arm_bg_wake(
721 root: ProjectRootId,
722 session: String,
723 channel: RouteChannel,
724 bg_wake_pending: &mut HashSet<RouteChannel>,
725 bg_wake_epoch: &mut HashMap<(ProjectRootId, String), u64>,
726) {
727 *bg_wake_epoch.entry((root, session)).or_default() += 1;
728 bg_wake_pending.insert(channel);
729}
730
731fn clear_stale_bg_wakes_for_empty_sessions(
732 root_id: &ProjectRootId,
733 empty_bg_sessions: &[(String, u64)],
734 bg_sub_by_session: &HashMap<(ProjectRootId, String), RouteChannel>,
735 bg_wake_pending: &mut HashSet<RouteChannel>,
736 bg_wake_epoch: &HashMap<(ProjectRootId, String), u64>,
737) {
738 for (session, epoch_at_submit) in empty_bg_sessions {
739 let key = (root_id.clone(), session.clone());
740 if bg_wake_epoch.get(&key).copied() == Some(*epoch_at_submit) {
741 if let Some(channel) = bg_sub_by_session.get(&key).copied() {
742 bg_wake_pending.remove(&channel);
743 }
744 }
745 }
746}
747
748fn bounded_push_back<T>(queue: &mut VecDeque<T>, item: T) {
749 if queue.len() >= PUSH_BUFFER_MAX_PER_KEY {
750 queue.pop_front();
751 }
752 queue.push_back(item);
753}
754
755fn buffer_push_frame(
756 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
757 key: ReplayKey,
758 frame: PushFrame,
759) {
760 bounded_push_back(push_buffer.entry(key).or_default(), frame);
761}
762
763fn buffer_retry_frame(
764 retry_buffer: &mut RetryBuffer,
765 channel: RouteChannel,
766 key: ReplayKey,
767 frame: PushFrame,
768) {
769 bounded_push_back(retry_buffer.entry(channel).or_default(), (key, frame));
770}
771
772fn migrate_retry_buffer_to_push_buffer(
773 retry_buffer: &mut RetryBuffer,
774 channel: RouteChannel,
775 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
776) -> usize {
777 let Some(frames) = retry_buffer.remove(&channel) else {
778 return 0;
779 };
780 let migrated = frames.len();
781 for (key, frame) in frames {
782 buffer_push_frame(push_buffer, key, frame);
783 }
784 migrated
785}
786
787fn replay_buffered_push_frames(
788 writer_tx: &mpsc::Sender<Frame>,
789 channel: RouteChannel,
790 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
791 key: &ReplayKey,
792 trust: BindTrust,
793) -> usize {
794 let mut sent = 0;
795 let remove_empty;
796
797 {
798 let Some(queue) = push_buffer.get_mut(key) else {
799 return 0;
800 };
801
802 while let Some(frame) = queue.pop_front() {
803 if frame_is_bash_observation(&frame) && !trust.allows_bash_observation() {
804 continue;
805 }
806 match try_send_push_frame(writer_tx, channel, &frame) {
807 PushSendOutcome::Sent => sent += 1,
808 PushSendOutcome::Backpressure => {
809 queue.push_front(frame);
810 break;
811 }
812 PushSendOutcome::PermanentFailure => {
813 log::warn!(
814 "subc attach: dropping buffered reliable Push for root {} harness {} session {} after permanent send failure",
815 key.root.as_path().display(),
816 key.harness,
817 key.session
818 );
819 }
820 }
821 }
822
823 remove_empty = queue.is_empty();
824 }
825
826 if remove_empty {
827 push_buffer.remove(key);
828 }
829
830 sent
831}
832
833fn drain_retry_buffer_for_channel(
834 writer_tx: &mpsc::Sender<Frame>,
835 channel: RouteChannel,
836 retry_buffer: &mut RetryBuffer,
837) -> usize {
838 let mut sent = 0;
839 let remove_empty;
840
841 {
842 let Some(queue) = retry_buffer.get_mut(&channel) else {
843 return 0;
844 };
845
846 while let Some((key, frame)) = queue.pop_front() {
847 match try_send_push_frame(writer_tx, channel, &frame) {
848 PushSendOutcome::Sent => sent += 1,
849 PushSendOutcome::Backpressure => {
850 queue.push_front((key, frame));
851 break;
852 }
853 PushSendOutcome::PermanentFailure => {
854 log::warn!(
855 "subc attach: dropping retry-buffered reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
856 key.root.as_path().display(),
857 key.harness,
858 key.session
859 );
860 }
861 }
862 }
863
864 remove_empty = queue.is_empty();
865 }
866
867 if remove_empty {
868 retry_buffer.remove(&channel);
869 }
870
871 sent
872}
873
874fn drain_retry_buffers_for_bound_routes(
875 writer_tx: &mpsc::Sender<Frame>,
876 routes: &HashMap<RouteChannel, RouteIdentity>,
877 retry_buffer: &mut RetryBuffer,
878) -> usize {
879 let channels: Vec<RouteChannel> = routes.keys().copied().collect();
880 channels
881 .into_iter()
882 .map(|channel| drain_retry_buffer_for_channel(writer_tx, channel, retry_buffer))
883 .sum()
884}
885
886fn matching_route_channels(
887 routes: &HashMap<RouteChannel, RouteIdentity>,
888 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
889 root: &ProjectRootId,
890 frame: &PushFrame,
891) -> Vec<RouteChannel> {
892 let Some(channels) = root_channels.get(root) else {
893 return Vec::new();
894 };
895
896 let session = frame_session(frame);
897 let bash_observation = frame_is_bash_observation(frame);
898 channels
899 .iter()
900 .copied()
901 .filter(|channel| {
902 let Some(identity) = routes.get(channel) else {
903 return !bash_observation && session.is_none();
904 };
905 if bash_observation && !identity.trust.allows_bash_observation() {
906 return false;
907 }
908 match session {
909 Some(session) => identity.session == session,
910 None => true,
911 }
912 })
913 .collect()
914}
915
916fn buffer_detached_reliable_push_frame(
917 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
918 session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
919 root: &ProjectRootId,
920 frame: &PushFrame,
921) {
922 let Some(session) = frame_session(frame) else {
923 log::warn!(
924 "subc attach: dropping reliable project-scoped Push for root {} because no route is bound",
925 root.as_path().display()
926 );
927 return;
928 };
929
930 if let Some((key, trust)) = replay_key_for_session(session_identity, root, session) {
931 if frame_is_bash_observation(frame) && !trust.allows_bash_observation() {
932 return;
933 }
934 buffer_push_frame(push_buffer, key, frame.clone());
935 } else {
936 log::warn!(
937 "subc attach: dropping reliable Push for root {} session {} because no retained harness identity is known",
938 root.as_path().display(),
939 session
940 );
941 }
942}
943
944fn fan_out_lossy_push_frame(
945 writer_tx: &mpsc::Sender<Frame>,
946 routes: &HashMap<RouteChannel, RouteIdentity>,
947 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
948 root: &ProjectRootId,
949 frame: &PushFrame,
950) -> FanOutResult {
951 let matching_channels = matching_route_channels(routes, root_channels, root, frame);
952 let matched_channels = matching_channels.len();
953 if matched_channels == 0 {
954 return FanOutResult::default();
955 }
956
957 let body = match serde_json::to_vec(frame) {
958 Ok(body) => body,
959 Err(error) => {
960 log::warn!("subc attach: failed to serialize PushFrame for fan-out: {error}");
961 return FanOutResult {
962 matched_channels,
963 sent_frames: 0,
964 };
965 }
966 };
967
968 let sent_frames = matching_channels
969 .into_iter()
970 .filter(|&channel| {
971 matches!(
972 try_send_push_body(writer_tx, channel, &body),
973 PushSendOutcome::Sent
974 )
975 })
976 .count();
977
978 FanOutResult {
979 matched_channels,
980 sent_frames,
981 }
982}
983
984fn fan_out_reliable_push_frame(
985 writer_tx: &mpsc::Sender<Frame>,
986 routes: &HashMap<RouteChannel, RouteIdentity>,
987 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
988 session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
989 retry_buffer: &mut RetryBuffer,
990 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
991 root: &ProjectRootId,
992 frame: &PushFrame,
993) -> FanOutResult {
994 let matching_channels = matching_route_channels(routes, root_channels, root, frame);
995 let matched_channels = matching_channels.len();
996 if matched_channels == 0 {
997 buffer_detached_reliable_push_frame(push_buffer, session_identity, root, frame);
998 return FanOutResult::default();
999 }
1000
1001 let mut sent_frames = 0;
1002 for channel in matching_channels {
1003 let Some(identity) = routes.get(&channel) else {
1004 log::warn!(
1005 "subc attach: dropping reliable Push for stale route channel {channel} with no route identity"
1006 );
1007 continue;
1008 };
1009 let key = ReplayKey::from_identity(identity);
1010
1011 if retry_buffer
1012 .get(&channel)
1013 .is_some_and(|queue| !queue.is_empty())
1014 {
1015 buffer_retry_frame(retry_buffer, channel, key, frame.clone());
1016 continue;
1017 }
1018
1019 match try_send_push_frame(writer_tx, channel, frame) {
1020 PushSendOutcome::Sent => sent_frames += 1,
1021 PushSendOutcome::Backpressure => {
1022 buffer_retry_frame(retry_buffer, channel, key, frame.clone());
1023 }
1024 PushSendOutcome::PermanentFailure => {
1025 log::warn!(
1026 "subc attach: dropping reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
1027 key.root.as_path().display(),
1028 key.harness,
1029 key.session
1030 );
1031 }
1032 }
1033 }
1034
1035 FanOutResult {
1036 matched_channels,
1037 sent_frames,
1038 }
1039}
1040
1041fn process_reliable_push_frame(
1042 writer_tx: &mpsc::Sender<Frame>,
1043 routes: &HashMap<RouteChannel, RouteIdentity>,
1044 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
1045 session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
1046 retry_buffer: &mut RetryBuffer,
1047 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1048 completed_tasks: &mut CompletedTaskIds,
1049 root: ProjectRootId,
1050 frame: PushFrame,
1051) -> Option<(ProjectRootId, String)> {
1052 let completed_bg_session = completed_bg_session_key(&root, &frame);
1053 if let Some(task_id) = completed_task_id(&frame) {
1054 completed_tasks.remember(task_id);
1055 }
1056 let _ = fan_out_reliable_push_frame(
1057 writer_tx,
1058 routes,
1059 root_channels,
1060 session_identity,
1061 retry_buffer,
1062 push_buffer,
1063 &root,
1064 &frame,
1065 );
1066 completed_bg_session
1067}
1068
1069fn process_lossy_push_frame(
1070 writer_tx: &mpsc::Sender<Frame>,
1071 routes: &HashMap<RouteChannel, RouteIdentity>,
1072 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
1073 completed_tasks: &CompletedTaskIds,
1074 root: ProjectRootId,
1075 frame: PushFrame,
1076) {
1077 if should_drop_lossy_push(completed_tasks, &frame) {
1078 if let Some(task_id) = long_running_task_id(&frame) {
1079 log::debug!(
1080 "subc attach: dropping stale BashLongRunning Push for completed task {task_id}"
1081 );
1082 }
1083 return;
1084 }
1085
1086 let _ = fan_out_lossy_push_frame(writer_tx, routes, root_channels, &root, &frame);
1087}
1088
1089pub type DispatchFn = fn(RawRequest, &AppContext) -> Response;
1092
1093pub fn run_subc_mode(
1098 connection_file_path: &Path,
1099 ctx: Arc<AppContext>,
1100 executor: Arc<Executor>,
1101 dispatch: DispatchFn,
1102 user_config_path: Option<PathBuf>,
1103) -> Result<(), SubcError> {
1104 run_subc_mode_inner(
1108 connection_file_path,
1109 ctx,
1110 executor,
1111 dispatch,
1112 user_config_path,
1113 false,
1114 )
1115}
1116
1117fn run_subc_mode_inner(
1118 connection_file_path: &Path,
1119 ctx: Arc<AppContext>,
1120 executor: Arc<Executor>,
1121 dispatch: DispatchFn,
1122 user_config_path: Option<PathBuf>,
1123 allow_native_passthrough: bool,
1124) -> Result<(), SubcError> {
1125 let runtime = tokio::runtime::Builder::new_current_thread()
1126 .enable_all()
1127 .build()
1128 .map_err(SubcError::Runtime)?;
1129
1130 let executor_for_loop = Arc::clone(&executor);
1131 let loop_result = runtime.block_on(async move {
1132 let shared_app = ctx.app();
1133 drop(ctx);
1134 let stream = connect_and_authenticate(connection_file_path).await?;
1135 log::info!(
1136 "subc attach: authenticated to daemon via {}",
1137 connection_file_path.display()
1138 );
1139 let (read_half, write_half) = tokio::io::split(stream);
1140 run_module_loop(
1141 read_half,
1142 write_half,
1143 shared_app,
1144 executor_for_loop,
1145 dispatch,
1146 user_config_path,
1147 allow_native_passthrough,
1148 )
1149 .await
1150 });
1151
1152 for actor_ctx in executor.actor_contexts() {
1153 actor_ctx.lsp().shutdown_all();
1154 actor_ctx.bash_background().detach();
1155 }
1156
1157 loop_result
1158}
1159
1160#[doc(hidden)]
1165pub fn run_subc_mode_for_test(
1166 connection_file_path: &Path,
1167 ctx: Arc<AppContext>,
1168 executor: Arc<Executor>,
1169 dispatch: DispatchFn,
1170 user_config_path: Option<PathBuf>,
1171) -> Result<(), SubcError> {
1172 run_subc_mode_inner(
1173 connection_file_path,
1174 ctx,
1175 executor,
1176 dispatch,
1177 user_config_path,
1178 true,
1179 )
1180}
1181
1182async fn connect_and_authenticate(connection_file_path: &Path) -> Result<TcpStream, SubcError> {
1185 let conn = connection_file::read(connection_file_path).map_err(|source| {
1186 SubcError::ConnectionFile {
1187 path: connection_file_path.to_path_buf(),
1188 source,
1189 }
1190 })?;
1191
1192 let endpoint = conn
1193 .endpoints
1194 .first()
1195 .ok_or_else(|| SubcError::NoEndpoint {
1196 path: connection_file_path.to_path_buf(),
1197 })?;
1198 let endpoint_label = format!("{}:{}", endpoint.host, endpoint.port);
1199 let ip = endpoint
1200 .host
1201 .parse::<IpAddr>()
1202 .map_err(|_| SubcError::InvalidEndpoint {
1203 path: connection_file_path.to_path_buf(),
1204 endpoint: endpoint_label.clone(),
1205 })?;
1206 let addr = SocketAddr::new(ip, endpoint.port);
1207
1208 let mut stream = TcpStream::connect(addr)
1209 .await
1210 .map_err(|source| SubcError::Connect {
1211 endpoint: endpoint_label.clone(),
1212 source,
1213 })?;
1214
1215 authenticate_client(&mut stream, &conn, AUTH_DEADLINE)
1216 .await
1217 .map_err(|source| SubcError::Auth {
1218 endpoint: endpoint_label,
1219 source,
1220 })?;
1221
1222 Ok(stream)
1223}
1224
1225async fn run_module_loop<R, W>(
1229 mut read: R,
1230 mut write: W,
1231 shared_app: Arc<App>,
1232 executor: Arc<Executor>,
1233 dispatch: DispatchFn,
1234 user_config_path: Option<PathBuf>,
1235 allow_native_passthrough: bool,
1236) -> Result<(), SubcError>
1237where
1238 R: AsyncRead + Unpin + Send + 'static,
1239 W: AsyncWrite + Unpin + Send + 'static,
1240{
1241 let hello = ModuleHelloBody {
1245 manifest: build_manifest(),
1246 protocol_ver: PROTOCOL_VERSION,
1247 control_ops: None,
1248 launch_nonce: std::env::var("SUBC_LAUNCH_NONCE").ok(),
1249 };
1250 let hello_frame = Frame::build(
1251 FrameType::Hello,
1252 control_flags(),
1253 0,
1254 HELLO_CORR,
1255 serde_json::to_vec(&hello).map_err(SubcError::Json)?,
1256 )
1257 .map_err(SubcError::FrameBuild)?;
1258 write_frame(&mut write, &hello_frame)
1259 .await
1260 .map_err(SubcError::FrameIo)?;
1261
1262 match read_frame(&mut read).await.map_err(SubcError::FrameIo)? {
1264 None => return Err(SubcError::ClosedBeforeHelloAck),
1265 Some(frame) => match frame.header.ty {
1266 FrameType::HelloAck => {
1267 log::info!("subc attach: registered (HelloAck received)");
1268 }
1269 FrameType::Error => {
1270 let body = serde_json::from_slice::<ErrorBody>(&frame.body).ok();
1271 return Err(SubcError::HelloRejected { body });
1272 }
1273 other => return Err(SubcError::UnexpectedFrame { ty: other }),
1274 },
1275 }
1276
1277 let (writer_tx, writer_rx) = mpsc::channel::<Frame>(256);
1278 let writer_task = spawn_writer_task(write, writer_rx);
1279 let (reader_tx, mut reader_rx) = mpsc::channel::<Result<Frame, SubcError>>(256);
1286 let reader_task = spawn_reader_task(read, reader_tx);
1287 let shutdown = Arc::new(Notify::new());
1288 let mut drain_interval = tokio::time::interval(Duration::from_millis(250));
1289 let (maintenance_tx, mut maintenance_rx) = mpsc::channel::<MaintenanceCompletion>(256);
1290 let (bash_deferred_tx, mut bash_deferred_rx) = mpsc::channel::<BashDeferredCompletion>(256);
1291 let (bash_poll_touch_tx, mut bash_poll_touch_rx) = mpsc::channel::<ProjectRootId>(256);
1292 let (control_completion_tx, mut control_completion_rx) =
1293 mpsc::channel::<RouteBindCompletion>(256);
1294 let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1024);
1295 let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
1296 let push_senders = PushSenders {
1297 lossy_tx,
1298 reliable_tx,
1299 };
1300 let connection_cancel = PersistentCancelSignal::new();
1301 let mut routes: HashMap<RouteChannel, RouteIdentity> = HashMap::new();
1302 let mut bg_subs: HashMap<RouteChannel, BgSub> = HashMap::new();
1303 let mut bg_sub_by_session: HashMap<(ProjectRootId, String), RouteChannel> = HashMap::new();
1304 let mut bg_wake_pending: HashSet<RouteChannel> = HashSet::new();
1305 let mut bg_wake_epoch: HashMap<(ProjectRootId, String), u64> = HashMap::new();
1306 let mut root_channels: HashMap<ProjectRootId, HashSet<RouteChannel>> = HashMap::new();
1307 let mut session_identity: HashMap<(ProjectRootId, String), RetainedSessionIdentity> =
1308 HashMap::new();
1309 let mut push_buffer: HashMap<ReplayKey, VecDeque<PushFrame>> = HashMap::new();
1310 let mut retry_buffer: RetryBuffer = HashMap::new();
1311 let mut completed_tasks = CompletedTaskIds::default();
1312 let mut live_roots: HashMap<ProjectRootId, RootMeta> = HashMap::new();
1313 let mut pending_binds: HashMap<RouteChannel, PendingBind> = HashMap::new();
1314 let mut route_bash_cancels: HashMap<RouteChannel, RouteBashCancel> = HashMap::new();
1315
1316 let loop_result: Result<(), SubcError> = loop {
1317 tokio::select! {
1318 _ = shutdown.notified() => {
1319 log::warn!("subc attach: fatal executor response requested teardown");
1320 break Ok(());
1321 }
1322 maybe_frame = reader_rx.recv() => {
1323 let frame = match maybe_frame {
1324 None => {
1325 log::info!("subc attach: daemon closed connection");
1326 break Ok(());
1327 }
1328 Some(Err(error)) => break Err(error),
1329 Some(Ok(frame)) => frame,
1330 };
1331
1332 match frame.header.ty {
1333 FrameType::Ping if frame.header.channel == 0 => {
1334 let pong = match Frame::build_with_version(
1335 frame.header.ver,
1336 FrameType::Pong,
1337 frame.header.flags,
1338 0,
1339 frame.header.corr,
1340 Vec::new(),
1341 ) {
1342 Ok(pong) => pong,
1343 Err(error) => break Err(SubcError::FrameBuild(error)),
1344 };
1345 if let Err(error) = send_frame(&writer_tx, pong).await {
1346 break Err(error);
1347 }
1348 }
1349 FrameType::Goodbye if frame.header.channel == 0 => {
1350 log::info!("subc attach: received channel-0 Goodbye");
1351 break Ok(());
1352 }
1353 FrameType::Goodbye => {
1354 let channel = route_key(frame.header.channel);
1355 end_bg_subscription(
1356 &writer_tx,
1357 &mut bg_subs,
1358 &mut bg_sub_by_session,
1359 &mut bg_wake_pending,
1360 channel,
1361 routes.get(&channel),
1362 );
1363 if let Some(cancel) = route_bash_cancels.remove(&channel) {
1364 cancel.token.cancel();
1365 }
1366 if let Some(pending) = pending_binds.get_mut(&channel) {
1367 pending.cancelled = true;
1368 log::debug!(
1369 "subc attach: cancelled pending RouteBind for route {} on Goodbye",
1370 frame.header.channel
1371 );
1372 }
1373 let migrated = migrate_retry_buffer_to_push_buffer(
1374 &mut retry_buffer,
1375 channel,
1376 &mut push_buffer,
1377 );
1378 if let Some(identity) = remove_route_channel(&mut routes, &mut root_channels, channel) {
1379 if migrated > 0 {
1380 log::debug!(
1381 "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from route {} into detach replay",
1382 frame.header.channel
1383 );
1384 }
1385 if let Some(meta) = live_roots.get_mut(&identity.root) {
1386 let idle_for = meta.last_touched.elapsed();
1387 meta.touch();
1388 log::debug!(
1389 "subc attach: route {} torn down for root {} harness {} session {} (last touched {:?} ago)",
1390 frame.header.channel,
1391 identity.root.as_path().display(),
1392 identity.harness,
1393 identity.session,
1394 idle_for
1395 );
1396 } else {
1397 log::debug!(
1398 "subc attach: route {} torn down for root {} harness {} session {}",
1399 frame.header.channel,
1400 identity.root.as_path().display(),
1401 identity.harness,
1402 identity.session
1403 );
1404 }
1405 } else {
1406 if migrated > 0 {
1407 log::debug!(
1408 "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from unbound route {} into detach replay",
1409 frame.header.channel
1410 );
1411 }
1412 log::debug!("subc attach: unbound route {} torn down", frame.header.channel);
1413 }
1414 }
1415 FrameType::Request if frame.header.channel == 0 => {
1416 if let Err(error) = handle_control_request(
1417 &writer_tx,
1418 &frame,
1419 &shared_app,
1420 &executor,
1421 &mut live_roots,
1422 &mut pending_binds,
1423 &control_completion_tx,
1424 &push_senders,
1425 dispatch,
1426 user_config_path.as_deref(),
1427 )
1428 .await
1429 {
1430 break Err(error);
1431 }
1432 }
1433 FrameType::Request => {
1434 if let Err(error) = handle_tool_call(
1435 &writer_tx,
1436 &frame,
1437 &routes,
1438 &pending_binds,
1439 &mut live_roots,
1440 &executor,
1441 &shutdown,
1442 &connection_cancel,
1443 &bash_deferred_tx,
1444 &bash_poll_touch_tx,
1445 &mut route_bash_cancels,
1446 &mut bg_subs,
1447 &mut bg_sub_by_session,
1448 &mut bg_wake_pending,
1449 &mut bg_wake_epoch,
1450 dispatch,
1451 allow_native_passthrough,
1452 )
1453 .await
1454 {
1455 break Err(error);
1456 }
1457 }
1458 FrameType::Cancel => {
1459 let channel = route_key(frame.header.channel);
1460 if bg_subs.contains_key(&channel) {
1461 end_bg_subscription(
1462 &writer_tx,
1463 &mut bg_subs,
1464 &mut bg_sub_by_session,
1465 &mut bg_wake_pending,
1466 channel,
1467 routes.get(&channel),
1468 );
1469 }
1470 }
1471 _ => {}
1475 }
1476 }
1477 Some((root_id, frame)) = reliable_rx.recv() => {
1478 let mut batch = vec![(root_id, frame)];
1482 while let Ok(item) = reliable_rx.try_recv() {
1483 batch.push(item);
1484 }
1485
1486 for (root, frame) in batch {
1487 if let Some((root, session)) = process_reliable_push_frame(
1488 &writer_tx,
1489 &routes,
1490 &root_channels,
1491 &session_identity,
1492 &mut retry_buffer,
1493 &mut push_buffer,
1494 &mut completed_tasks,
1495 root,
1496 frame,
1497 ) {
1498 if let Some(channel) = bg_sub_by_session
1499 .get(&(root.clone(), session.clone()))
1500 .copied()
1501 {
1502 arm_bg_wake(
1503 root,
1504 session,
1505 channel,
1506 &mut bg_wake_pending,
1507 &mut bg_wake_epoch,
1508 );
1509 }
1510 }
1511 }
1512 }
1513 Some((root_id, frame)) = lossy_rx.recv() => {
1514 while let Ok((reliable_root, reliable_frame)) = reliable_rx.try_recv() {
1518 if let Some((root, session)) = process_reliable_push_frame(
1519 &writer_tx,
1520 &routes,
1521 &root_channels,
1522 &session_identity,
1523 &mut retry_buffer,
1524 &mut push_buffer,
1525 &mut completed_tasks,
1526 reliable_root,
1527 reliable_frame,
1528 ) {
1529 if let Some(channel) = bg_sub_by_session
1530 .get(&(root.clone(), session.clone()))
1531 .copied()
1532 {
1533 arm_bg_wake(
1534 root,
1535 session,
1536 channel,
1537 &mut bg_wake_pending,
1538 &mut bg_wake_epoch,
1539 );
1540 }
1541 }
1542 }
1543
1544 let mut batch = vec![(root_id, frame)];
1548 while let Ok(item) = lossy_rx.try_recv() {
1549 batch.push(item);
1550 }
1551
1552 for (root, frame) in coalesce_push_batch(batch) {
1553 process_lossy_push_frame(
1554 &writer_tx,
1555 &routes,
1556 &root_channels,
1557 &completed_tasks,
1558 root,
1559 frame,
1560 );
1561 }
1562 }
1563 Some(completion) = control_completion_rx.recv() => {
1564 if let Err(error) = handle_route_bind_completion(
1565 &writer_tx,
1566 completion,
1567 &mut routes,
1568 &mut root_channels,
1569 &mut session_identity,
1570 &mut push_buffer,
1571 &mut live_roots,
1572 &mut pending_binds,
1573 &executor,
1574 &shutdown,
1575 )
1576 .await
1577 {
1578 break Err(error);
1579 }
1580 }
1581 Some(done) = bash_deferred_rx.recv() => {
1582 if let Err(error) = handle_bash_deferred_completion(
1583 &writer_tx,
1584 done,
1585 &routes,
1586 &mut live_roots,
1587 &mut route_bash_cancels,
1588 &shutdown,
1589 )
1590 .await
1591 {
1592 break Err(error);
1593 }
1594 }
1595 Some(root_id) = bash_poll_touch_rx.recv() => {
1596 if let Some(meta) = live_roots.get_mut(&root_id) {
1597 meta.touch();
1598 }
1599 }
1600 Some(completion) = maintenance_rx.recv() => {
1601 let root_id = completion.root_id;
1602 let response = completion.response;
1603 if let Some(meta) = live_roots.get_mut(&root_id) {
1604 meta.maintenance_pending = false;
1605 }
1606 clear_stale_bg_wakes_for_empty_sessions(
1607 &root_id,
1608 &completion.empty_bg_sessions,
1609 &bg_sub_by_session,
1610 &mut bg_wake_pending,
1611 &bg_wake_epoch,
1612 );
1613 if response_is_fatal_panic(&response) {
1614 signal_fatal_teardown(&writer_tx, None, PROTOCOL_VERSION, 0, &shutdown).await;
1615 }
1616 }
1617 _ = drain_interval.tick() => {
1618 emit_bg_event_wakes(&writer_tx, &bg_subs, &mut bg_wake_pending);
1619
1620 let retried = drain_retry_buffers_for_bound_routes(
1621 &writer_tx,
1622 &routes,
1623 &mut retry_buffer,
1624 );
1625 if retried > 0 {
1626 log::debug!(
1627 "subc attach: retried {retried} reliable Push frame(s) after writer backpressure"
1628 );
1629 }
1630
1631 let due_roots: Vec<ProjectRootId> = live_roots
1632 .iter_mut()
1633 .filter_map(|(root_id, meta)| {
1634 if meta.maintenance_pending {
1635 None
1636 } else {
1637 meta.maintenance_pending = true;
1638 Some(root_id.clone())
1639 }
1640 })
1641 .collect();
1642 for root_id in due_roots {
1643 let bg_sessions_to_check: Vec<(String, u64)> = bg_sub_by_session
1644 .iter()
1645 .filter_map(|((root, session), _)| {
1646 if root == &root_id {
1647 Some((
1648 session.clone(),
1649 bg_wake_epoch
1650 .get(&(root_id.clone(), session.clone()))
1651 .copied()
1652 .unwrap_or(0),
1653 ))
1654 } else {
1655 None
1656 }
1657 })
1658 .collect();
1659 submit_maintenance_drain(
1660 &executor,
1661 root_id,
1662 bg_sessions_to_check,
1663 &maintenance_tx,
1664 );
1665 }
1666 }
1667 }
1668 };
1669
1670 connection_cancel.cancel();
1673 reader_task.abort();
1674 drop(writer_tx);
1675 let writer_result = finish_writer_task(writer_task).await;
1676 loop_result.and(writer_result)
1677}
1678
1679fn spawn_writer_task<W>(
1680 mut write: W,
1681 mut rx: mpsc::Receiver<Frame>,
1682) -> JoinHandle<Result<(), subc_transport::FrameIoError>>
1683where
1684 W: AsyncWrite + Unpin + Send + 'static,
1685{
1686 tokio::spawn(async move {
1687 while let Some(frame) = rx.recv().await {
1688 write_frame(&mut write, &frame).await?;
1689 }
1690 Ok(())
1691 })
1692}
1693
1694fn spawn_reader_task<R>(mut read: R, tx: mpsc::Sender<Result<Frame, SubcError>>) -> JoinHandle<()>
1701where
1702 R: AsyncRead + Unpin + Send + 'static,
1703{
1704 tokio::spawn(async move {
1705 loop {
1706 match read_frame(&mut read).await {
1707 Ok(Some(frame)) => {
1708 if tx.send(Ok(frame)).await.is_err() {
1709 return;
1710 }
1711 }
1712 Ok(None) => {
1713 return;
1715 }
1716 Err(error) => {
1717 let _ = tx.send(Err(SubcError::FrameIo(error))).await;
1718 return;
1719 }
1720 }
1721 }
1722 })
1723}
1724
1725async fn finish_writer_task(
1726 mut writer_task: JoinHandle<Result<(), subc_transport::FrameIoError>>,
1727) -> Result<(), SubcError> {
1728 match tokio::time::timeout(Duration::from_millis(100), &mut writer_task).await {
1729 Ok(Ok(Ok(()))) => Ok(()),
1730 Ok(Ok(Err(error))) => Err(SubcError::FrameIo(error)),
1731 Ok(Err(error)) => Err(SubcError::WriterJoin(error)),
1732 Err(_) => {
1733 writer_task.abort();
1734 Ok(())
1735 }
1736 }
1737}
1738
1739async fn send_frame(tx: &mpsc::Sender<Frame>, frame: Frame) -> Result<(), SubcError> {
1740 match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(frame)).await {
1741 Ok(Ok(())) => Ok(()),
1742 Ok(Err(_)) => Err(SubcError::WriterClosed),
1743 Err(_) => Err(SubcError::WriterBackpressureTimeout),
1744 }
1745}
1746
1747fn rollback_pending_bind_actor(
1748 executor: &Arc<Executor>,
1749 live_roots: &HashMap<ProjectRootId, RootMeta>,
1750 root_id: &ProjectRootId,
1751 inserted_new_actor: bool,
1752) {
1753 if inserted_new_actor && !live_roots.contains_key(root_id) {
1754 executor.remove_actor(root_id);
1755 }
1756}
1757
1758async fn handle_route_bind_completion(
1759 tx: &mpsc::Sender<Frame>,
1760 completion: RouteBindCompletion,
1761 routes: &mut HashMap<RouteChannel, RouteIdentity>,
1762 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
1763 session_identity: &mut HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
1764 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1765 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1766 pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1767 executor: &Arc<Executor>,
1768 shutdown: &Arc<Notify>,
1769) -> Result<(), SubcError> {
1770 let route_id = route_key(completion.route_channel);
1771 let Some(pending) = pending_binds.remove(&route_id) else {
1772 log::warn!(
1773 "subc attach: dropping RouteBind completion for non-pending route {}",
1774 completion.route_channel
1775 );
1776 rollback_pending_bind_actor(
1777 executor,
1778 live_roots,
1779 &completion.bind_root_id,
1780 completion.inserted_new_actor,
1781 );
1782 return Ok(());
1783 };
1784
1785 if pending.bind_root_id != completion.bind_root_id {
1786 log::warn!(
1787 "subc attach: pending RouteBind root mismatch for route {} (pending {} completion {})",
1788 completion.route_channel,
1789 pending.bind_root_id.as_path().display(),
1790 completion.bind_root_id.as_path().display()
1791 );
1792 }
1793
1794 let inserted_new_actor = pending.inserted_new_actor || completion.inserted_new_actor;
1795 if pending.cancelled {
1796 rollback_pending_bind_actor(
1797 executor,
1798 live_roots,
1799 &completion.bind_root_id,
1800 inserted_new_actor,
1801 );
1802 log::debug!(
1803 "subc attach: discarded completed RouteBind for cancelled route {} root {}",
1804 completion.route_channel,
1805 completion.bind_root_id.as_path().display()
1806 );
1807 return Ok(());
1808 }
1809
1810 let failure = if !completion.configure_response.success {
1811 Some((
1812 &completion.configure_response,
1813 "configure failed during route bind",
1814 ))
1815 } else if let Some(drain_response) = completion.drain_response.as_ref() {
1816 if drain_response.success {
1817 None
1818 } else {
1819 Some((
1820 drain_response,
1821 "build-completion drain failed during route bind",
1822 ))
1823 }
1824 } else {
1825 None
1826 };
1827
1828 if let Some((response, fallback)) = failure {
1829 rollback_pending_bind_actor(
1830 executor,
1831 live_roots,
1832 &completion.bind_root_id,
1833 inserted_new_actor,
1834 );
1835 let message = response_message(response, fallback);
1836 let fatal = response_is_fatal_panic(response);
1837 send_route_bind_error_parts(
1838 tx,
1839 completion.ver,
1840 completion.corr,
1841 completion.flags,
1842 "config_divergence",
1843 &message,
1844 )
1845 .await?;
1846 if fatal {
1847 signal_fatal_teardown(
1848 tx,
1849 Some(completion.route_channel),
1850 completion.ver,
1851 completion.corr,
1852 shutdown,
1853 )
1854 .await;
1855 }
1856 return Ok(());
1857 }
1858
1859 remember_session_identity(session_identity, &completion.identity);
1860 let replay_key = ReplayKey::from_identity(&completion.identity);
1861 let bind_trust = completion.identity.trust;
1862 insert_route_channel(routes, root_channels, route_id, completion.identity);
1863 live_roots
1864 .entry(completion.bind_root_id.clone())
1865 .and_modify(|meta| {
1866 meta.touch();
1867 meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1868 })
1869 .or_insert_with(|| RootMeta::new(Instant::now()));
1870 if let Some(meta) = live_roots.get_mut(&completion.bind_root_id) {
1871 meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1872 }
1873
1874 let ack =
1875 serde_json::to_vec(&ModuleControlResponse::RouteBindAck {}).map_err(SubcError::Json)?;
1876 let response = Frame::build_with_version(
1877 completion.ver,
1878 FrameType::Response,
1879 control_flags(),
1880 0,
1881 completion.corr,
1882 ack,
1883 )
1884 .map_err(SubcError::FrameBuild)?;
1885 send_frame(tx, response).await?;
1886 let replayed = replay_buffered_push_frames(tx, route_id, push_buffer, &replay_key, bind_trust);
1887 if replayed > 0 {
1888 log::debug!(
1889 "subc attach: replayed {} buffered Push frame(s) to route {} root {} harness {} session {}",
1890 replayed,
1891 completion.route_channel,
1892 replay_key.root.as_path().display(),
1893 replay_key.harness,
1894 replay_key.session
1895 );
1896 }
1897 log::info!(
1898 "subc attach: route {} bound to root {}",
1899 completion.route_channel,
1900 completion.bind_root_id.as_path().display()
1901 );
1902 Ok(())
1903}
1904
1905async fn handle_control_request(
1910 tx: &mpsc::Sender<Frame>,
1911 frame: &Frame,
1912 shared_app: &Arc<App>,
1913 executor: &Arc<Executor>,
1914 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1915 pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1916 control_completion_tx: &mpsc::Sender<RouteBindCompletion>,
1917 push_senders: &PushSenders,
1918 dispatch: DispatchFn,
1919 user_config_path: Option<&Path>,
1920) -> Result<(), SubcError> {
1921 let request =
1922 serde_json::from_slice::<ModuleControlRequest>(&frame.body).map_err(SubcError::Json)?;
1923 match request {
1924 ModuleControlRequest::RouteBind {
1925 route_channel,
1926 target: _,
1927 identity,
1928 principal,
1929 } => {
1930 let route_id = route_key(route_channel);
1931 if pending_binds.contains_key(&route_id) {
1932 return send_route_bind_error(
1933 tx,
1934 frame,
1935 "config_divergence",
1936 "route bind is already pending for channel",
1937 )
1938 .await;
1939 }
1940
1941 let bind_root_id = match ProjectRootId::from_path(&identity.project_root) {
1942 Ok(root_id) => root_id,
1943 Err(error) => {
1944 return send_route_bind_error(
1945 tx,
1946 frame,
1947 "config_divergence",
1948 &format!("invalid route project root: {error}"),
1949 )
1950 .await;
1951 }
1952 };
1953
1954 let request_id = format!("subc-bind-{route_channel}");
1957 let bind_project_root = identity.project_root.clone();
1958 let bind_harness = identity.harness.clone();
1959 let bind_session = identity.session.clone();
1960 let bind_trust = trust_for_principal(&principal);
1961 log::info!(
1962 "subc attach: route {} principal={} trust={}",
1963 route_channel,
1964 principal_label(&principal),
1965 bind_trust.label()
1966 );
1967
1968 let local_tiers = crate::subc_config::read_local_cortexkit_config_tiers(
1980 user_config_path,
1981 Path::new(&bind_project_root),
1982 );
1983 let config_tiers: Vec<Value> = local_tiers
1984 .iter()
1985 .map(|t| json!({ "tier": t.tier, "source": t.source, "doc": t.doc }))
1986 .collect();
1987 let diagnostics_on_edit = diagnostics_on_edit_from_tiers(&local_tiers);
1988 let configure_json = json!({
1989 "id": request_id,
1990 "command": "configure",
1991 "project_root": bind_project_root,
1992 "harness": bind_harness,
1993 "session_id": bind_session.clone(),
1994 "config": config_tiers,
1995 });
1996 let configure_req = match serde_json::from_value::<RawRequest>(configure_json) {
1997 Ok(req) => req,
1998 Err(error) => {
1999 return send_route_bind_error(
2000 tx,
2001 frame,
2002 "config_divergence",
2003 &format!("failed to build configure request: {error}"),
2004 )
2005 .await;
2006 }
2007 };
2008
2009 let route_identity = RouteIdentity {
2010 root: bind_root_id.clone(),
2011 project_root: PathBuf::from(&bind_project_root),
2012 harness: bind_harness.clone(),
2013 session: bind_session.clone(),
2014 trust: bind_trust,
2015 };
2016 let configure_session = route_identity.session.clone();
2017 let root_was_live = live_roots.contains_key(&bind_root_id);
2018 let inserted_new_actor = if root_was_live {
2019 log::debug!(
2020 "subc attach: reusing actor for route {} root {}",
2021 route_channel,
2022 bind_root_id.as_path().display()
2023 );
2024 false
2025 } else {
2026 let actor_ctx = Arc::new(AppContext::from_app(
2027 Arc::clone(shared_app),
2028 Config::default(),
2029 ));
2030 install_bash_compressor(&actor_ctx);
2031 actor_ctx.set_progress_sender(Some(progress_sender_for_root(
2032 push_senders.clone(),
2033 bind_root_id.clone(),
2034 )));
2035 let inserted =
2036 executor.register_actor(bind_root_id.clone(), Arc::clone(&actor_ctx));
2037 drop(actor_ctx);
2038 log::debug!(
2042 "subc attach: registered actor for route {} root {}",
2043 route_channel,
2044 bind_root_id.as_path().display()
2045 );
2046 inserted
2047 };
2048
2049 pending_binds.insert(
2050 route_id,
2051 PendingBind {
2052 bind_root_id: bind_root_id.clone(),
2053 inserted_new_actor,
2054 cancelled: false,
2055 },
2056 );
2057
2058 let configure_request_id = configure_req.id.clone();
2059 let configure_rx = executor.submit_async(
2060 bind_root_id.clone(),
2061 Lane::Mutating,
2062 configure_request_id.clone(),
2063 Box::new(move |ctx| {
2064 log_ctx::with_session(Some(configure_session.clone()), || {
2065 dispatch(configure_req, ctx)
2066 })
2067 }),
2068 );
2069
2070 let completion_tx = control_completion_tx.clone();
2071 let completion_executor = Arc::clone(executor);
2072 let completion_identity = route_identity;
2073 let completion_root = bind_root_id.clone();
2074 let completion_route_channel = route_channel;
2075 let completion_ver = frame.header.ver;
2076 let completion_corr = frame.header.corr;
2077 let completion_flags = frame.header.flags;
2078 tokio::spawn(async move {
2079 let configure_response =
2080 await_executor_response(configure_rx, configure_request_id.clone()).await;
2081 let drain_response = if configure_response.success && !root_was_live {
2082 let drain_request_id = format!("subc-bind-drain-{completion_route_channel}");
2083 let drain_response_id = drain_request_id.clone();
2084 let drain_rx = completion_executor.submit_async(
2085 completion_root.clone(),
2086 Lane::Mutating,
2087 drain_request_id.clone(),
2088 Box::new(move |ctx| {
2089 runtime_drain::drain_build_completions(ctx);
2090 Response::success(drain_response_id, json!({ "drained": true }))
2091 }),
2092 );
2093 Some(await_executor_response(drain_rx, drain_request_id).await)
2094 } else {
2095 None
2096 };
2097
2098 let completion = RouteBindCompletion {
2099 route_channel: completion_route_channel,
2100 identity: completion_identity,
2101 bind_root_id: completion_root,
2102 inserted_new_actor,
2103 configure_response,
2104 drain_response,
2105 diagnostics_on_edit,
2106 ver: completion_ver,
2107 corr: completion_corr,
2108 flags: completion_flags,
2109 };
2110 if completion_tx.send(completion).await.is_err() {
2111 log::debug!(
2112 "subc attach: dropped RouteBind completion for route {} after loop exit",
2113 completion_route_channel
2114 );
2115 }
2116 });
2117
2118 Ok(())
2119 }
2120 }
2121}
2122
2123fn install_bash_compressor(ctx: &AppContext) {
2124 let filter_registry_handle = ctx.shared_filter_registry();
2126 let compress_flag = ctx.bash_compress_flag();
2127 ctx.bash_background().set_compressor_with_exit_code(
2128 move |command: &str, output: String, exit_code: Option<i32>| {
2129 if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
2130 return crate::compress::CompressionResult::new(output);
2131 }
2132 let registry_guard = match filter_registry_handle.read() {
2133 Ok(g) => g,
2134 Err(poisoned) => poisoned.into_inner(),
2135 };
2136 crate::compress::compress_with_registry_exit_code(
2137 command,
2138 &output,
2139 exit_code,
2140 ®istry_guard,
2141 )
2142 },
2143 );
2144}
2145
2146fn diagnostics_on_edit_from_tiers(tiers: &[ConfigTier]) -> bool {
2147 let mut diagnostics_on_edit = false;
2148 for tier in tiers {
2149 if let Some(value) = diagnostics_on_edit_from_doc(&tier.doc) {
2150 diagnostics_on_edit = value;
2151 }
2152 }
2153 diagnostics_on_edit
2154}
2155
2156fn diagnostics_on_edit_from_doc(doc: &str) -> Option<bool> {
2157 let stripped = strip_jsonc(doc);
2158 let value = serde_json::from_str::<Value>(&stripped).ok()?;
2159 value
2160 .get("lsp")
2161 .and_then(Value::as_object)?
2162 .get("diagnostics_on_edit")
2163 .and_then(Value::as_bool)
2164}
2165
2166async fn send_route_bind_error(
2167 tx: &mpsc::Sender<Frame>,
2168 frame: &Frame,
2169 code: &str,
2170 message: &str,
2171) -> Result<(), SubcError> {
2172 send_route_bind_error_parts(
2173 tx,
2174 frame.header.ver,
2175 frame.header.corr,
2176 frame.header.flags,
2177 code,
2178 message,
2179 )
2180 .await
2181}
2182
2183async fn send_route_bind_error_parts(
2184 tx: &mpsc::Sender<Frame>,
2185 ver: u8,
2186 corr: u64,
2187 flags: Flags,
2188 code: &str,
2189 message: &str,
2190) -> Result<(), SubcError> {
2191 let response = build_error_frame(ver, 0, corr, flags, code, message)?;
2192 send_frame(tx, response).await?;
2193 log::warn!("subc attach: route bind rejected ({code}): {message}");
2194 Ok(())
2195}
2196
2197async fn handle_tool_call(
2202 tx: &mpsc::Sender<Frame>,
2203 frame: &Frame,
2204 routes: &HashMap<RouteChannel, RouteIdentity>,
2205 pending_binds: &HashMap<RouteChannel, PendingBind>,
2206 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
2207 executor: &Arc<Executor>,
2208 shutdown: &Arc<Notify>,
2209 connection_cancel: &PersistentCancelSignal,
2210 bash_deferred_tx: &mpsc::Sender<BashDeferredCompletion>,
2211 bash_poll_touch_tx: &mpsc::Sender<ProjectRootId>,
2212 route_bash_cancels: &mut HashMap<RouteChannel, RouteBashCancel>,
2213 bg_subs: &mut HashMap<RouteChannel, BgSub>,
2214 bg_sub_by_session: &mut HashMap<(ProjectRootId, String), RouteChannel>,
2215 bg_wake_pending: &mut HashSet<RouteChannel>,
2216 bg_wake_epoch: &mut HashMap<(ProjectRootId, String), u64>,
2217 dispatch: DispatchFn,
2218 allow_native_passthrough: bool,
2219) -> Result<(), SubcError> {
2220 let route_id = route_key(frame.header.channel);
2221 if pending_binds.contains_key(&route_id) {
2222 let error = build_error_frame(
2223 frame.header.ver,
2224 frame.header.channel,
2225 frame.header.corr,
2226 frame.header.flags,
2227 "route_not_bound",
2228 "route is not bound before tool call",
2229 )?;
2230 return send_frame(tx, error).await;
2231 }
2232
2233 let Some(identity) = routes.get(&route_id).cloned() else {
2234 let error = build_error_frame(
2235 frame.header.ver,
2236 frame.header.channel,
2237 frame.header.corr,
2238 frame.header.flags,
2239 "route_not_bound",
2240 "route is not bound before tool call",
2241 )?;
2242 return send_frame(tx, error).await;
2243 };
2244 if let Some(meta) = live_roots.get_mut(&identity.root) {
2245 meta.touch();
2246 }
2247
2248 let is_bg_events_subscribe = serde_json::from_slice::<BgEventsProbe>(&frame.body)
2249 .ok()
2250 .and_then(|probe| probe.op)
2251 .as_deref()
2252 == Some("bg_events");
2253 if is_bg_events_subscribe {
2254 if let Some(old_sub) = bg_subs.get(&route_id).copied() {
2255 let _ = try_send_bg_stream_end(tx, route_id, &old_sub);
2256 }
2257 if !identity.trust.allows_bash_observation() {
2258 bg_subs.remove(&route_id);
2259 bg_wake_pending.remove(&route_id);
2260 remove_bg_subscription_index(bg_sub_by_session, route_id, Some(&identity));
2261 return Ok(());
2262 }
2263 bg_subs.insert(
2264 route_id,
2265 BgSub {
2266 corr: frame.header.corr,
2267 ver: frame.header.ver,
2268 flags: frame.header.flags,
2269 },
2270 );
2271 bg_sub_by_session.insert((identity.root.clone(), identity.session.clone()), route_id);
2272 arm_bg_wake(
2273 identity.root,
2274 identity.session,
2275 route_id,
2276 bg_wake_pending,
2277 bg_wake_epoch,
2278 );
2279 return Ok(());
2280 }
2281
2282 let call = serde_json::from_slice::<ToolCallRequest>(&frame.body).map_err(SubcError::Json)?;
2283 let bare_name = call.name.clone();
2284 let format_context = crate::subc_format::FormatContext::from_tool_call(
2285 &bare_name,
2286 &call.arguments,
2287 identity.project_root.as_path(),
2288 );
2289
2290 let request_id = format!("subc-{}-{}", frame.header.channel, frame.header.corr);
2291 let bind_trust = identity.trust;
2292 let diagnostics_on_edit = live_roots
2293 .get(&identity.root)
2294 .map(|meta| meta.diagnostics_on_edit)
2295 .unwrap_or(false);
2296
2297 if matches!(bind_trust, BindTrust::Untrusted) && is_bash_family_tool(&bare_name) {
2298 let response = bash_denied_untrusted_response(request_id.clone());
2299 let text = crate::subc_format::format_response_with_context(
2300 &bare_name,
2301 &response,
2302 &format_context,
2303 );
2304 let result = ToolCallResult { text, response };
2305 let response_frame = build_tool_response_frame(
2306 frame.header.ver,
2307 frame.header.channel,
2308 frame.header.corr,
2309 frame.header.flags,
2310 &result,
2311 )?;
2312 return send_frame(tx, response_frame).await;
2313 }
2314
2315 if !is_subc_agent_core_tool(&call.name)
2323 && !is_subc_native_plumbing_tool(&call.name)
2324 && !allow_native_passthrough
2325 {
2326 log::warn!(
2327 "subc tool call: rejecting non-manifest tool name {:?} on route {} (fail-closed)",
2328 call.name,
2329 frame.header.channel
2330 );
2331 let response = Response::error(
2332 request_id.clone(),
2333 "unknown_tool",
2334 format!("tool {:?} is not in the AFT tool manifest", call.name),
2335 );
2336 let text = crate::subc_format::format_response_with_context(
2337 &bare_name,
2338 &response,
2339 &format_context,
2340 );
2341 let result = ToolCallResult { text, response };
2342 let response_frame = build_tool_response_frame(
2343 frame.header.ver,
2344 frame.header.channel,
2345 frame.header.corr,
2346 frame.header.flags,
2347 &result,
2348 )?;
2349 return send_frame(tx, response_frame).await;
2350 }
2351
2352 if bare_name == "bash" {
2353 let meta = live_roots
2354 .entry(identity.root.clone())
2355 .or_insert_with(|| RootMeta::new(Instant::now()));
2356 meta.active_bash_waits = meta.active_bash_waits.saturating_add(1);
2357 meta.touch();
2358
2359 let route_cancel = route_bash_cancels
2360 .entry(route_id)
2361 .or_insert_with(|| RouteBashCancel {
2362 token: PersistentCancelSignal::new(),
2363 active_waits: 0,
2364 });
2365 route_cancel.active_waits = route_cancel.active_waits.saturating_add(1);
2366 let cancel = BashWaitCancel {
2367 connection: connection_cancel.clone(),
2368 route: route_cancel.token.clone(),
2369 };
2370
2371 submit_deferred_bash(
2372 executor,
2373 bash_deferred_tx,
2374 bash_poll_touch_tx,
2375 dispatch,
2376 identity.root,
2377 identity.project_root,
2378 identity.session,
2379 request_id,
2380 frame.header.channel,
2381 frame.header.corr,
2382 frame.header.flags,
2383 frame.header.ver,
2384 call.arguments,
2385 format_context,
2386 cancel,
2387 bind_trust,
2388 );
2389 return Ok(());
2390 }
2391
2392 let lane = command_lane(&bare_name);
2393 let tool_call_context = ToolCallContext {
2394 project_root: identity.project_root.clone(),
2395 session_id: Some(identity.session.clone()),
2396 request_id: request_id.clone(),
2397 diagnostics_on_edit,
2398 preview: false,
2399 };
2400 let arguments_for_run = call.arguments.clone();
2401 let bare_name_for_run = bare_name.clone();
2402 let bare_name_for_frame = bare_name.clone();
2403 let bare_name_for_finalize = bare_name.clone();
2404 let session_for_log = identity.session.clone();
2405 let session_for_finalize = identity.session.clone();
2406 let request_id_for_force = request_id.clone();
2407 let format_context_for_frame = format_context;
2408 let (text_tx, text_rx) = oneshot::channel::<String>();
2409 let rx = executor.submit_async(
2410 identity.root,
2411 lane,
2412 request_id.clone(),
2413 Box::new(move |ctx| {
2414 log_ctx::with_session(Some(session_for_log.clone()), || {
2415 let run = || {
2416 let dispatch_with_finalize = |raw_req: RawRequest, app_ctx: &AppContext| {
2417 let mut response = dispatch(raw_req, app_ctx);
2418 crate::response_finalize::finalize_response_with_bg_completions(
2419 &mut response,
2420 app_ctx,
2421 &session_for_finalize,
2422 &bare_name_for_finalize,
2423 bind_trust.allows_bash_observation(),
2424 );
2425 response
2426 };
2427 match run_tool_call(
2428 &bare_name_for_run,
2429 &arguments_for_run,
2430 &tool_call_context,
2431 ctx,
2432 &dispatch_with_finalize,
2433 ) {
2434 ToolCallOutcome::Unary(result) => {
2435 let _ = text_tx.send(result.text);
2436 result.response
2437 }
2438 }
2439 };
2440 if matches!(bind_trust, BindTrust::Untrusted) {
2441 ctx.with_force_restrict(&request_id_for_force, run)
2442 } else {
2443 run()
2444 }
2445 })
2446 }),
2447 );
2448 let completion_tx = tx.clone();
2449 let completion_shutdown = Arc::clone(shutdown);
2450 let route_channel = frame.header.channel;
2451 let corr = frame.header.corr;
2452 let flags = frame.header.flags;
2453 let ver = frame.header.ver;
2454 tokio::spawn(async move {
2455 let response = await_executor_response(rx, request_id.clone()).await;
2456 let text = text_rx.await.unwrap_or_else(|_| {
2457 crate::subc_format::format_response_with_context(
2458 &bare_name_for_frame,
2459 &response,
2460 &format_context_for_frame,
2461 )
2462 });
2463 let result = ToolCallResult { text, response };
2464 let fatal = response_is_fatal_panic(&result.response);
2465 match build_tool_response_frame(ver, route_channel, corr, flags, &result) {
2466 Ok(response_frame) => {
2467 let _ = completion_tx.send(response_frame).await;
2468 }
2469 Err(error) => {
2470 log::error!("subc attach: failed to build tool response frame: {error}");
2471 }
2472 }
2473 if fatal {
2474 signal_fatal_teardown(
2475 &completion_tx,
2476 Some(route_channel),
2477 ver,
2478 corr,
2479 &completion_shutdown,
2480 )
2481 .await;
2482 }
2483 });
2484 Ok(())
2485}
2486
2487#[derive(Clone, Copy, Debug, Default)]
2488struct BashTranslatedSettings {
2489 background: bool,
2490 pty: bool,
2491 wait: bool,
2492 block_to_completion: bool,
2493 timeout: Option<u64>,
2494}
2495
2496enum BashSpawnControl {
2497 Immediate,
2498 Foreground {
2499 task_id: String,
2500 session_id: String,
2501 project_root: Option<PathBuf>,
2502 storage_dir: PathBuf,
2503 deadline: Instant,
2504 block_to_completion: bool,
2505 timeout: Option<u64>,
2506 wait_window_ms: u64,
2507 },
2508}
2509
2510enum BashPollControl {
2511 Done,
2512 Promote,
2513 Wait,
2514}
2515
2516fn bash_settings_from_translated(args: &serde_json::Map<String, Value>) -> BashTranslatedSettings {
2517 BashTranslatedSettings {
2518 background: args
2519 .get("background")
2520 .and_then(Value::as_bool)
2521 .unwrap_or(false),
2522 pty: args.get("pty").and_then(Value::as_bool).unwrap_or(false),
2523 wait: args.get("wait").and_then(Value::as_bool).unwrap_or(false),
2524 block_to_completion: args
2525 .get("block_to_completion")
2526 .and_then(Value::as_bool)
2527 .unwrap_or(false),
2528 timeout: args.get("timeout").and_then(Value::as_u64),
2529 }
2530}
2531
2532fn finalized_bash_result(
2533 mut response: Response,
2534 ctx: &AppContext,
2535 session_id: &str,
2536 format_context: &crate::subc_format::FormatContext,
2537 allow_bg_completions: bool,
2538) -> ToolCallResult {
2539 crate::response_finalize::finalize_response_with_bg_completions(
2540 &mut response,
2541 ctx,
2542 session_id,
2543 "bash",
2544 allow_bg_completions,
2545 );
2546 bash_result_from_response(response, format_context)
2547}
2548
2549fn bash_result_from_response(
2550 response: Response,
2551 format_context: &crate::subc_format::FormatContext,
2552) -> ToolCallResult {
2553 let text = crate::subc_format::format_response_with_context("bash", &response, format_context);
2554 ToolCallResult { text, response }
2555}
2556
2557fn bash_background_launch_response(request_id: &str, task_id: &str, is_pty: bool) -> Response {
2558 Response::success(
2559 request_id,
2560 json!({
2561 "output": crate::commands::bash_orchestrate::format_background_launch(task_id, is_pty),
2562 "task_id": task_id,
2563 "status": "running",
2564 "mode": if is_pty { "pty" } else { "pipes" },
2565 }),
2566 )
2567}
2568
2569fn finish_bash_spawn_immediate(
2570 response: Response,
2571 ctx: &AppContext,
2572 session_id: &str,
2573 format_context: &crate::subc_format::FormatContext,
2574 text_tx: &mut Option<oneshot::Sender<String>>,
2575 control_tx: &mut Option<oneshot::Sender<BashSpawnControl>>,
2576 allow_bg_completions: bool,
2577) -> Response {
2578 let result = finalized_bash_result(
2579 response,
2580 ctx,
2581 session_id,
2582 format_context,
2583 allow_bg_completions,
2584 );
2585 let ToolCallResult { text, response } = result;
2586 if let Some(tx) = text_tx.take() {
2587 let _ = tx.send(text);
2588 }
2589 if let Some(tx) = control_tx.take() {
2590 let _ = tx.send(BashSpawnControl::Immediate);
2591 }
2592 response
2593}
2594
2595fn finish_bash_poll_done(
2596 response: Response,
2597 ctx: &AppContext,
2598 session_id: &str,
2599 format_context: &crate::subc_format::FormatContext,
2600 text_tx: &mut Option<oneshot::Sender<String>>,
2601 control_tx: &mut Option<oneshot::Sender<BashPollControl>>,
2602) -> Response {
2603 let result = finalized_bash_result(response, ctx, session_id, format_context, true);
2604 let ToolCallResult { text, response } = result;
2605 if let Some(tx) = text_tx.take() {
2606 let _ = tx.send(text);
2607 }
2608 if let Some(tx) = control_tx.take() {
2609 let _ = tx.send(BashPollControl::Done);
2610 }
2611 response
2612}
2613
2614#[allow(clippy::too_many_arguments)]
2615fn submit_deferred_bash(
2616 executor: &Arc<Executor>,
2617 completion_tx: &mpsc::Sender<BashDeferredCompletion>,
2618 poll_touch_tx: &mpsc::Sender<ProjectRootId>,
2619 dispatch: DispatchFn,
2620 root: ProjectRootId,
2621 project_root: PathBuf,
2622 session_id: String,
2623 request_id: String,
2624 route_channel: u16,
2625 corr: u64,
2626 flags: Flags,
2627 ver: u8,
2628 arguments: Value,
2629 format_context: crate::subc_format::FormatContext,
2630 cancel: BashWaitCancel,
2631 bind_trust: BindTrust,
2632) {
2633 let (spawn_control_tx, spawn_control_rx) = oneshot::channel::<BashSpawnControl>();
2634 let (spawn_text_tx, spawn_text_rx) = oneshot::channel::<String>();
2635 let root_for_spawn = root.clone();
2636 let request_id_for_spawn = request_id.clone();
2637 let session_for_spawn = session_id.clone();
2638 let project_root_for_spawn = project_root.clone();
2639 let format_context_for_spawn = format_context.clone();
2640 let spawn_rx = executor.submit_async(
2641 root_for_spawn,
2642 Lane::Mutating,
2643 request_id.clone(),
2644 Box::new(move |ctx| {
2645 log_ctx::with_session(Some(session_for_spawn.clone()), || {
2646 let mut spawn_text_tx = Some(spawn_text_tx);
2647 let mut spawn_control_tx = Some(spawn_control_tx);
2648
2649 if matches!(bind_trust, BindTrust::Untrusted) {
2650 let response = bash_denied_untrusted_response(request_id_for_spawn.clone());
2651 return finish_bash_spawn_immediate(
2652 response,
2653 ctx,
2654 &session_for_spawn,
2655 &format_context_for_spawn,
2656 &mut spawn_text_tx,
2657 &mut spawn_control_tx,
2658 false,
2659 );
2660 }
2661
2662 let translated = match crate::subc_translate::subc_translate(
2663 "bash",
2664 &arguments,
2665 &project_root_for_spawn,
2666 ) {
2667 Ok(translated) => translated,
2668 Err(error) => {
2669 let response = Response::error(
2670 request_id_for_spawn.clone(),
2671 error.code,
2672 error.message,
2673 );
2674 return finish_bash_spawn_immediate(
2675 response,
2676 ctx,
2677 &session_for_spawn,
2678 &format_context_for_spawn,
2679 &mut spawn_text_tx,
2680 &mut spawn_control_tx,
2681 true,
2682 );
2683 }
2684 };
2685 let settings = bash_settings_from_translated(&translated.args);
2686 let raw_req = RawRequest {
2687 id: request_id_for_spawn.clone(),
2688 command: "bash".to_string(),
2689 lsp_hints: None,
2690 session_id: Some(session_for_spawn.clone()),
2691 params: Value::Object(translated.args),
2692 };
2693 let response = dispatch(raw_req, ctx);
2694 if !response.success {
2695 return finish_bash_spawn_immediate(
2696 response,
2697 ctx,
2698 &session_for_spawn,
2699 &format_context_for_spawn,
2700 &mut spawn_text_tx,
2701 &mut spawn_control_tx,
2702 true,
2703 );
2704 }
2705
2706 let Some(task_id) = response
2707 .data
2708 .get("task_id")
2709 .and_then(Value::as_str)
2710 .map(str::to_string)
2711 else {
2712 return finish_bash_spawn_immediate(
2713 response,
2714 ctx,
2715 &session_for_spawn,
2716 &format_context_for_spawn,
2717 &mut spawn_text_tx,
2718 &mut spawn_control_tx,
2719 true,
2720 );
2721 };
2722 if response.data.get("status").and_then(Value::as_str) != Some("running") {
2723 return finish_bash_spawn_immediate(
2724 response,
2725 ctx,
2726 &session_for_spawn,
2727 &format_context_for_spawn,
2728 &mut spawn_text_tx,
2729 &mut spawn_control_tx,
2730 true,
2731 );
2732 }
2733
2734 let mode = response
2735 .data
2736 .get("mode")
2737 .and_then(Value::as_str)
2738 .unwrap_or("pipes");
2739 let is_pty = mode == "pty" || settings.pty;
2740 if is_pty || settings.background {
2741 let response =
2742 bash_background_launch_response(&request_id_for_spawn, &task_id, is_pty);
2743 return finish_bash_spawn_immediate(
2744 response,
2745 ctx,
2746 &session_for_spawn,
2747 &format_context_for_spawn,
2748 &mut spawn_text_tx,
2749 &mut spawn_control_tx,
2750 true,
2751 );
2752 }
2753
2754 let wait_window_ms =
2755 crate::commands::bash_orchestrate::select_foreground_wait_window_ms(
2756 ctx.config().foreground_wait_window_ms,
2757 settings.timeout,
2758 settings.wait,
2759 );
2760 let deadline = Instant::now() + Duration::from_millis(wait_window_ms);
2761 let storage_dir =
2762 crate::bash_background::storage_dir(ctx.config().storage_dir.as_deref());
2763 let project_root = ctx.config().project_root.clone();
2764 if let Some(tx) = spawn_control_tx.take() {
2765 let _ = tx.send(BashSpawnControl::Foreground {
2766 task_id,
2767 session_id: session_for_spawn.clone(),
2768 project_root,
2769 storage_dir,
2770 deadline,
2771 block_to_completion: settings.block_to_completion || settings.wait,
2772 timeout: settings.timeout,
2773 wait_window_ms,
2774 });
2775 }
2776 response
2777 })
2778 }),
2779 );
2780
2781 let executor = Arc::clone(executor);
2782 let completion_tx = completion_tx.clone();
2783 let poll_touch_tx = poll_touch_tx.clone();
2784 let root_for_task = root.clone();
2785 tokio::spawn(async move {
2786 let spawn_response = await_executor_response(spawn_rx, request_id.clone()).await;
2787 let spawn_control = spawn_control_rx.await;
2788 match spawn_control {
2789 Ok(BashSpawnControl::Immediate) => {
2790 let text = spawn_text_rx.await.unwrap_or_else(|_| {
2791 crate::subc_format::format_response_with_context(
2792 "bash",
2793 &spawn_response,
2794 &format_context,
2795 )
2796 });
2797 let result = ToolCallResult {
2798 text,
2799 response: spawn_response,
2800 };
2801 let fatal = response_is_fatal_panic(&result.response);
2802 send_bash_deferred_completion(
2803 &completion_tx,
2804 route_channel,
2805 corr,
2806 flags,
2807 ver,
2808 root_for_task,
2809 request_id,
2810 Some(result),
2811 fatal,
2812 )
2813 .await;
2814 }
2815 Ok(BashSpawnControl::Foreground {
2816 task_id,
2817 session_id,
2818 project_root,
2819 storage_dir,
2820 deadline,
2821 block_to_completion,
2822 timeout,
2823 wait_window_ms,
2824 }) => {
2825 run_deferred_bash_wait(
2826 executor,
2827 completion_tx,
2828 poll_touch_tx,
2829 route_channel,
2830 corr,
2831 flags,
2832 ver,
2833 root_for_task,
2834 request_id,
2835 task_id,
2836 session_id,
2837 project_root,
2838 storage_dir,
2839 deadline,
2840 block_to_completion,
2841 timeout,
2842 wait_window_ms,
2843 format_context,
2844 cancel,
2845 )
2846 .await;
2847 }
2848 Err(_) => {
2849 let result = bash_result_from_response(spawn_response, &format_context);
2850 let fatal = response_is_fatal_panic(&result.response);
2851 send_bash_deferred_completion(
2852 &completion_tx,
2853 route_channel,
2854 corr,
2855 flags,
2856 ver,
2857 root_for_task,
2858 request_id,
2859 Some(result),
2860 fatal,
2861 )
2862 .await;
2863 }
2864 }
2865 });
2866}
2867
2868#[allow(clippy::too_many_arguments)]
2869async fn run_deferred_bash_wait(
2870 executor: Arc<Executor>,
2871 completion_tx: mpsc::Sender<BashDeferredCompletion>,
2872 poll_touch_tx: mpsc::Sender<ProjectRootId>,
2873 route_channel: u16,
2874 corr: u64,
2875 flags: Flags,
2876 ver: u8,
2877 root: ProjectRootId,
2878 request_id: String,
2879 task_id: String,
2880 session_id: String,
2881 project_root: Option<PathBuf>,
2882 storage_dir: PathBuf,
2883 deadline: Instant,
2884 block_to_completion: bool,
2885 timeout: Option<u64>,
2886 wait_window_ms: u64,
2887 format_context: crate::subc_format::FormatContext,
2888 cancel: BashWaitCancel,
2889) {
2890 loop {
2891 tokio::select! {
2892 _ = cancel.cancelled() => {
2893 send_bash_deferred_completion(
2894 &completion_tx,
2895 route_channel,
2896 corr,
2897 flags,
2898 ver,
2899 root,
2900 request_id,
2901 None,
2902 false,
2903 )
2904 .await;
2905 break;
2906 }
2907 _ = tokio::time::sleep(PENDING_POLL_INTERVAL) => {
2908 let (poll_control_tx, poll_control_rx) = oneshot::channel::<BashPollControl>();
2909 let (poll_text_tx, poll_text_rx) = oneshot::channel::<String>();
2910 let root_for_poll = root.clone();
2911 let request_id_for_poll = request_id.clone();
2912 let task_id_for_poll = task_id.clone();
2913 let session_for_poll = session_id.clone();
2914 let storage_for_poll = storage_dir.clone();
2915 let project_root_for_poll = project_root.clone();
2916 let format_context_for_poll = format_context.clone();
2917 let poll_rx = executor.submit_async(
2918 root_for_poll,
2919 Lane::PureRead,
2920 request_id.clone(),
2921 Box::new(move |ctx| {
2922 log_ctx::with_session(Some(session_for_poll.clone()), || {
2923 let mut poll_text_tx = Some(poll_text_tx);
2924 let mut poll_control_tx = Some(poll_control_tx);
2925
2926 let Some(snapshot) = crate::commands::bash_orchestrate::poll_bash_status(
2927 ctx,
2928 &task_id_for_poll,
2929 &session_for_poll,
2930 project_root_for_poll.as_deref(),
2931 &storage_for_poll,
2932 crate::bash_background::output::RUNNING_OUTPUT_PREVIEW_BYTES,
2933 ) else {
2934 return finish_bash_poll_done(
2935 crate::commands::bash_orchestrate::task_not_found_response(
2936 &request_id_for_poll,
2937 &task_id_for_poll,
2938 ),
2939 ctx,
2940 &session_for_poll,
2941 &format_context_for_poll,
2942 &mut poll_text_tx,
2943 &mut poll_control_tx,
2944 );
2945 };
2946
2947 match crate::commands::bash_orchestrate::decide_bash_step(
2948 snapshot,
2949 deadline,
2950 block_to_completion,
2951 Instant::now(),
2952 &request_id_for_poll,
2953 ) {
2954 crate::commands::bash_orchestrate::BashStep::Done(response) => {
2955 finish_bash_poll_done(
2956 response,
2957 ctx,
2958 &session_for_poll,
2959 &format_context_for_poll,
2960 &mut poll_text_tx,
2961 &mut poll_control_tx,
2962 )
2963 }
2964 crate::commands::bash_orchestrate::BashStep::Promote => {
2965 if let Some(tx) = poll_control_tx.take() {
2966 let _ = tx.send(BashPollControl::Promote);
2967 }
2968 Response::success(
2969 request_id_for_poll,
2970 json!({ "subc_bash_step": "promote" }),
2971 )
2972 }
2973 crate::commands::bash_orchestrate::BashStep::Wait => {
2974 if let Some(tx) = poll_control_tx.take() {
2975 let _ = tx.send(BashPollControl::Wait);
2976 }
2977 Response::success(
2978 request_id_for_poll,
2979 json!({ "subc_bash_step": "wait" }),
2980 )
2981 }
2982 }
2983 })
2984 }),
2985 );
2986 let poll_response = await_executor_response(poll_rx, request_id.clone()).await;
2987 let _ = poll_touch_tx.send(root.clone()).await;
2988 match poll_control_rx.await.unwrap_or(BashPollControl::Done) {
2989 BashPollControl::Done => {
2990 let text = poll_text_rx.await.unwrap_or_else(|_| {
2991 crate::subc_format::format_response_with_context(
2992 "bash",
2993 &poll_response,
2994 &format_context,
2995 )
2996 });
2997 let result = ToolCallResult {
2998 text,
2999 response: poll_response,
3000 };
3001 let fatal = response_is_fatal_panic(&result.response);
3002 send_bash_deferred_completion(
3003 &completion_tx,
3004 route_channel,
3005 corr,
3006 flags,
3007 ver,
3008 root,
3009 request_id,
3010 Some(result),
3011 fatal,
3012 )
3013 .await;
3014 break;
3015 }
3016 BashPollControl::Promote => {
3017 let result = submit_bash_promote(
3018 &executor,
3019 root.clone(),
3020 request_id.clone(),
3021 task_id.clone(),
3022 session_id.clone(),
3023 timeout,
3024 wait_window_ms,
3025 format_context.clone(),
3026 )
3027 .await;
3028 let fatal = response_is_fatal_panic(&result.response);
3029 send_bash_deferred_completion(
3030 &completion_tx,
3031 route_channel,
3032 corr,
3033 flags,
3034 ver,
3035 root,
3036 request_id,
3037 Some(result),
3038 fatal,
3039 )
3040 .await;
3041 break;
3042 }
3043 BashPollControl::Wait => {}
3044 }
3045 }
3046 }
3047 }
3048}
3049
3050async fn submit_bash_promote(
3051 executor: &Arc<Executor>,
3052 root: ProjectRootId,
3053 request_id: String,
3054 task_id: String,
3055 session_id: String,
3056 timeout: Option<u64>,
3057 wait_window_ms: u64,
3058 format_context: crate::subc_format::FormatContext,
3059) -> ToolCallResult {
3060 let (text_tx, text_rx) = oneshot::channel::<String>();
3061 let request_id_for_promote = request_id.clone();
3062 let task_id_for_promote = task_id.clone();
3063 let session_for_promote = session_id.clone();
3064 let format_context_for_promote = format_context.clone();
3065 let promote_rx = executor.submit_async(
3066 root,
3067 Lane::Mutating,
3068 request_id.clone(),
3069 Box::new(move |ctx| {
3070 log_ctx::with_session(Some(session_for_promote.clone()), || {
3071 let response = if let Some(value) =
3072 std::env::var_os("AFT_TEST_FORCE_SUBC_BASH_PROMOTE_ERROR")
3073 {
3074 if value.to_string_lossy() == "panic" {
3075 panic!("forced subc bash promote panic");
3076 }
3077 Response::error(
3078 &request_id_for_promote,
3079 "execution_failed",
3080 "forced subc bash promote failure",
3081 )
3082 } else {
3083 crate::commands::bash_orchestrate::promote_bash(
3084 ctx,
3085 &task_id_for_promote,
3086 &session_for_promote,
3087 ctx.config().project_root.as_deref(),
3088 timeout,
3089 wait_window_ms,
3090 &request_id_for_promote,
3091 )
3092 };
3093 let result = finalized_bash_result(
3094 response,
3095 ctx,
3096 &session_for_promote,
3097 &format_context_for_promote,
3098 true,
3099 );
3100 let ToolCallResult { text, response } = result;
3101 let _ = text_tx.send(text);
3102 response
3103 })
3104 }),
3105 );
3106 let response = await_executor_response(promote_rx, request_id).await;
3107 let text = text_rx.await.unwrap_or_else(|_| {
3108 crate::subc_format::format_response_with_context("bash", &response, &format_context)
3109 });
3110 ToolCallResult { text, response }
3111}
3112
3113#[allow(clippy::too_many_arguments)]
3114async fn send_bash_deferred_completion(
3115 completion_tx: &mpsc::Sender<BashDeferredCompletion>,
3116 channel: u16,
3117 corr: u64,
3118 flags: Flags,
3119 ver: u8,
3120 root: ProjectRootId,
3121 request_id: String,
3122 result: Option<ToolCallResult>,
3123 fatal: bool,
3124) {
3125 let _ = completion_tx
3126 .send(BashDeferredCompletion {
3127 channel,
3128 corr,
3129 flags,
3130 ver,
3131 root,
3132 request_id,
3133 result,
3134 fatal,
3135 })
3136 .await;
3137}
3138
3139async fn handle_bash_deferred_completion(
3140 tx: &mpsc::Sender<Frame>,
3141 done: BashDeferredCompletion,
3142 routes: &HashMap<RouteChannel, RouteIdentity>,
3143 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
3144 route_bash_cancels: &mut HashMap<RouteChannel, RouteBashCancel>,
3145 shutdown: &Arc<Notify>,
3146) -> Result<(), SubcError> {
3147 if let Some(meta) = live_roots.get_mut(&done.root) {
3148 meta.active_bash_waits = meta.active_bash_waits.saturating_sub(1);
3149 meta.touch();
3150 }
3151 let route_id = route_key(done.channel);
3152 let remove_route_cancel = if let Some(cancel) = route_bash_cancels.get_mut(&route_id) {
3153 cancel.active_waits = cancel.active_waits.saturating_sub(1);
3154 cancel.active_waits == 0
3155 } else {
3156 false
3157 };
3158 if remove_route_cancel {
3159 route_bash_cancels.remove(&route_id);
3160 }
3161
3162 if let Some(result) = done.result {
3163 if routes.contains_key(&route_id) {
3164 let frame =
3165 build_tool_response_frame(done.ver, done.channel, done.corr, done.flags, &result)?;
3166 send_frame(tx, frame).await?;
3167 } else {
3168 log::debug!(
3169 "subc attach: dropping deferred bash response {} for unbound route {}",
3170 done.request_id,
3171 done.channel
3172 );
3173 }
3174 } else {
3175 log::debug!(
3176 "subc attach: deferred bash wait {} cancelled before delivery on route {}",
3177 done.request_id,
3178 done.channel
3179 );
3180 }
3181
3182 if done.fatal {
3183 signal_fatal_teardown(tx, Some(done.channel), done.ver, done.corr, shutdown).await;
3184 }
3185 Ok(())
3186}
3187fn submit_maintenance_drain(
3188 executor: &Arc<Executor>,
3189 root_id: ProjectRootId,
3190 bg_sessions_to_check: Vec<(String, u64)>,
3191 completion_tx: &mpsc::Sender<MaintenanceCompletion>,
3192) {
3193 let request_id = format!(
3194 "subc-maintenance-drain-{}",
3195 root_id.as_path().to_string_lossy()
3196 );
3197 let response_id = request_id.clone();
3198 let completion_root_id = root_id.clone();
3199 let (empty_bg_sessions_tx, empty_bg_sessions_rx) = oneshot::channel::<Vec<(String, u64)>>();
3200 let rx = executor.submit_async(
3201 root_id,
3202 Lane::Mutating,
3203 request_id.clone(),
3204 Box::new(move |ctx| {
3205 runtime_drain::drain_configure_warning_events(ctx);
3206 runtime_drain::drain_search_index_events(ctx);
3207 runtime_drain::drain_callgraph_store_events(ctx);
3208 runtime_drain::drain_semantic_index_events(ctx);
3209 runtime_drain::drain_semantic_refresh_events(ctx);
3210 runtime_drain::drain_inspect_events(ctx);
3211 runtime_drain::drain_watcher_events(ctx);
3212 runtime_drain::drain_lsp_events(ctx);
3213 let empty_bg_sessions = bg_sessions_to_check
3214 .into_iter()
3215 .filter(|(session, _)| {
3216 !ctx.bash_background()
3217 .has_completions_for_session(Some(session.as_str()))
3218 })
3219 .collect();
3220 let _ = empty_bg_sessions_tx.send(empty_bg_sessions);
3221 Response::success(response_id, json!({ "drained": true }))
3222 }),
3223 );
3224 let completion_tx = completion_tx.clone();
3225 tokio::spawn(async move {
3226 let response = await_executor_response(rx, request_id).await;
3227 let empty_bg_sessions = empty_bg_sessions_rx.await.unwrap_or_default();
3228 let _ = completion_tx
3229 .send(MaintenanceCompletion {
3230 root_id: completion_root_id,
3231 response,
3232 empty_bg_sessions,
3233 })
3234 .await;
3235 });
3236}
3237
3238async fn await_executor_response(rx: oneshot::Receiver<Response>, request_id: String) -> Response {
3239 rx.await
3240 .unwrap_or_else(|_| Response::error(request_id, "internal_error", "executor dropped"))
3241}
3242
3243fn flat_tool_response(response: &crate::protocol::Response, text: &str) -> Value {
3253 let mut obj = serde_json::Map::new();
3254 obj.insert("id".to_string(), Value::String(response.id.clone()));
3255 obj.insert("success".to_string(), Value::Bool(response.success));
3256 if let Some(data) = response.data.as_object() {
3257 for (key, value) in data {
3258 obj.insert(key.clone(), value.clone());
3259 }
3260 }
3261 obj.insert("text".to_string(), Value::String(text.to_string()));
3262 Value::Object(obj)
3263}
3264
3265fn build_tool_response_frame(
3266 ver: u8,
3267 route_channel: u16,
3268 corr: u64,
3269 flags: Flags,
3270 result: &ToolCallResult,
3271) -> Result<Frame, SubcError> {
3272 let is_error = !result.response.success;
3273 let payload = json!({
3284 "content": [{ "type": "text", "text": result.text.as_str() }],
3285 "isError": is_error,
3286 "structuredContent": flat_tool_response(&result.response, &result.text),
3287 });
3288 let body = serde_json::to_vec(&payload).map_err(SubcError::Json)?;
3289
3290 Frame::build_with_version(ver, FrameType::Response, flags, route_channel, corr, body)
3291 .map_err(SubcError::FrameBuild)
3292}
3293
3294fn build_error_frame(
3295 ver: u8,
3296 channel: u16,
3297 corr: u64,
3298 flags: Flags,
3299 code: &str,
3300 message: &str,
3301) -> Result<Frame, SubcError> {
3302 let body = serde_json::to_vec(&ErrorBody {
3303 code: code.to_string(),
3304 message: message.to_string(),
3305 })
3306 .map_err(SubcError::Json)?;
3307 Frame::build_with_version(ver, FrameType::Error, flags, channel, corr, body)
3308 .map_err(SubcError::FrameBuild)
3309}
3310
3311fn build_goodbye_frame(ver: u8, channel: u16, corr: u64) -> Result<Frame, SubcError> {
3312 Frame::build_with_version(
3313 ver,
3314 FrameType::Goodbye,
3315 control_flags(),
3316 channel,
3317 corr,
3318 Vec::new(),
3319 )
3320 .map_err(SubcError::FrameBuild)
3321}
3322
3323async fn signal_fatal_teardown(
3324 tx: &mpsc::Sender<Frame>,
3325 route_channel: Option<u16>,
3326 ver: u8,
3327 corr: u64,
3328 shutdown: &Arc<Notify>,
3329) {
3330 if let Some(route_channel) = route_channel {
3331 if let Ok(frame) = build_goodbye_frame(ver, route_channel, corr) {
3332 if let Err(error) = send_frame(tx, frame).await {
3333 log::warn!(
3334 "subc attach: failed to queue fatal route Goodbye for route {route_channel}: {error}"
3335 );
3336 }
3337 }
3338 }
3339 if let Ok(frame) = build_goodbye_frame(ver, 0, 0) {
3340 if let Err(error) = send_frame(tx, frame).await {
3341 log::warn!("subc attach: failed to queue fatal channel-0 Goodbye: {error}");
3342 }
3343 }
3344 shutdown.notify_one();
3345}
3346
3347fn response_message(response: &Response, fallback: &str) -> String {
3348 response
3349 .data
3350 .get("message")
3351 .and_then(Value::as_str)
3352 .map(ToOwned::to_owned)
3353 .unwrap_or_else(|| fallback.to_string())
3354}
3355
3356fn response_is_fatal_panic(response: &Response) -> bool {
3357 !response.success && response.data.get("code").and_then(Value::as_str) == Some("actor_fatal")
3358}
3359
3360fn bash_denied_untrusted_response(request_id: impl Into<String>) -> Response {
3361 Response::error(
3362 request_id.into(),
3363 "bash_denied_untrusted",
3364 "remote/MCP-facade binds cannot run shell commands",
3365 )
3366}
3367
3368fn is_bash_family_tool(name: &str) -> bool {
3369 name == "bash" || name.starts_with("bash_")
3370}
3371
3372fn is_subc_agent_core_tool(name: &str) -> bool {
3373 matches!(
3374 name,
3375 "status"
3376 | "bash"
3377 | "read"
3378 | "write"
3379 | "edit"
3380 | "apply_patch"
3381 | "grep"
3382 | "glob"
3383 | "search"
3384 | "outline"
3385 | "zoom"
3386 | "inspect"
3387 | "callgraph"
3388 | "conflicts"
3389 | "ast_search"
3390 | "ast_replace"
3391 | "delete"
3392 | "move"
3393 | "import"
3394 | "refactor"
3395 | "safety"
3396 )
3397}
3398
3399fn is_subc_native_plumbing_tool(name: &str) -> bool {
3417 matches!(name, "bash_drain_completions" | "bash_ack_completions")
3418}
3419
3420fn command_lane(command: &str) -> Lane {
3421 match command {
3422 "ping"
3423 | "version"
3424 | "echo"
3425 | "bash_drain_completions"
3426 | "bash_regex_match"
3427 | "db_get_state"
3428 | "db_get_host_state"
3429 | "read"
3430 | "undo_preview"
3431 | "edit_history"
3432 | "checkpoint_paths"
3433 | "list_checkpoints"
3434 | "conflicts"
3435 | "glob"
3436 | "grep"
3437 | "git_conflicts"
3438 | "ast_search" => Lane::PureRead,
3439
3440 "bash_status" | "outline" | "zoom" => Lane::PureRead,
3444
3445 "status"
3446 | "inspect"
3447 | "lsp_diagnostics"
3448 | "lsp_inspect"
3449 | "lsp_hover"
3450 | "lsp_goto_definition"
3451 | "lsp_find_references"
3452 | "lsp_prepare_rename" => Lane::SerialLspStatus,
3453
3454 "semantic_search" | "search" | "callgraph" | "callers" | "impact" | "call_tree"
3455 | "trace_to" | "trace_to_symbol" | "trace_data" | "inspect_tier2_run" => Lane::HeavyInit,
3456
3457 "bash"
3458 | "bash_ack_completions"
3459 | "bash_notify"
3460 | "bash_unnotify"
3461 | "bash_promote"
3462 | "bash_kill"
3463 | "bash_write"
3464 | "db_set_state"
3465 | "db_set_host_state"
3466 | "undo"
3467 | "checkpoint"
3468 | "restore_checkpoint"
3469 | "write"
3470 | "delete_file"
3471 | "move_file"
3472 | "edit"
3473 | "edit_symbol"
3474 | "edit_match"
3475 | "batch"
3476 | "add_import"
3477 | "remove_import"
3478 | "organize_imports"
3479 | "configure"
3480 | "move_symbol"
3481 | "extract_function"
3482 | "inline_symbol"
3483 | "ast_replace"
3484 | "lsp_rename"
3485 | "list_filters"
3486 | "trust_filter_project"
3487 | "untrust_filter_project"
3488 | "snapshot" => Lane::Mutating,
3489
3490 _ => Lane::Mutating,
3491 }
3492}
3493
3494#[derive(Deserialize)]
3495struct BgEventsProbe {
3496 op: Option<String>,
3497}
3498
3499#[derive(Debug, Deserialize)]
3500struct ToolCallRequest {
3501 name: String,
3502 #[serde(default)]
3503 arguments: Value,
3504}
3505
3506static SUBC_TOOL_SCHEMAS: LazyLock<serde_json::Map<String, Value>> = LazyLock::new(|| {
3507 serde_json::from_str(include_str!("subc_tool_schemas.json"))
3508 .unwrap_or_else(|e| panic!("subc_tool_schemas.json: {e}"))
3509});
3510
3511fn tool_schema(name: &str) -> Value {
3512 SUBC_TOOL_SCHEMAS.get(name).cloned().unwrap_or_else(|| {
3513 log::warn!(
3514 "subc build_manifest: missing embedded schema for tool {name:?}; using placeholder"
3515 );
3516 json!({ "type": "object" })
3517 })
3518}
3519
3520fn build_manifest() -> ModuleManifest {
3525 let tool = |name: &str, execution_mode: ExecutionMode| Tool {
3526 name: name.to_string(),
3527 execution_mode,
3528 schema: tool_schema(name),
3529 };
3530 ModuleManifest {
3537 module_id: "aft".to_string(),
3538 module_version: env!("CARGO_PKG_VERSION").to_string(),
3539 protocol_ver: PROTOCOL_VERSION,
3540 trust_tier: TrustTier::FirstParty,
3541 provides: vec![ProviderRole::ToolProvider {
3542 tools: vec![
3543 tool("status", ExecutionMode::Pure),
3544 tool("bash", ExecutionMode::Mutating),
3545 tool("read", ExecutionMode::Pure),
3546 tool("write", ExecutionMode::Mutating),
3547 tool("edit", ExecutionMode::Mutating),
3548 tool("apply_patch", ExecutionMode::Mutating),
3549 tool("grep", ExecutionMode::Pure),
3550 tool("glob", ExecutionMode::Pure),
3551 tool("search", ExecutionMode::Pure),
3552 tool("outline", ExecutionMode::Pure),
3553 tool("zoom", ExecutionMode::Pure),
3554 tool("inspect", ExecutionMode::Pure),
3555 tool("callgraph", ExecutionMode::Pure),
3556 tool("conflicts", ExecutionMode::Pure),
3557 tool("ast_search", ExecutionMode::Pure),
3558 tool("ast_replace", ExecutionMode::Mutating),
3559 tool("delete", ExecutionMode::Mutating),
3560 tool("move", ExecutionMode::Mutating),
3561 tool("import", ExecutionMode::Mutating),
3562 tool("refactor", ExecutionMode::Mutating),
3563 tool("safety", ExecutionMode::Mutating),
3564 ],
3565 identity_scope: vec![IdentityScope::Session, IdentityScope::Project],
3566 concurrency: Concurrency::ModuleManaged,
3567 emits_push: true,
3568 sub_supervises: true,
3569 }],
3570 consumes: Vec::new(),
3571 scheduled_tasks: Vec::new(),
3572 bindings: Bindings {
3573 storage: StorageBinding {
3574 kind: StorageKind::Sqlite,
3575 scope: StorageScope::Project,
3576 owns_schema: true,
3577 },
3578 vault_grants: Vec::new(),
3579 identity: IdentityBinding {
3580 requires: vec![IdentityScope::Project],
3581 optional: vec![IdentityScope::Session],
3582 },
3583 },
3584 }
3585}
3586
3587fn control_flags() -> Flags {
3588 Flags::new(false, Priority::Passive, false)
3589}
3590
3591#[cfg(test)]
3592mod tests {
3593 use super::*;
3594 use crate::bash_background::BgTaskStatus;
3595 use crate::protocol::{
3596 BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, ConfigureWarningsFrame,
3597 ProgressFrame, StatusChangedFrame,
3598 };
3599 use serde_json::json;
3600
3601 fn test_root(name: &str) -> (tempfile::TempDir, ProjectRootId) {
3602 let dir = tempfile::Builder::new()
3603 .prefix(name)
3604 .tempdir()
3605 .expect("temp root");
3606 let root = ProjectRootId::from_path(dir.path()).expect("project root id");
3607 (dir, root)
3608 }
3609
3610 fn status_frame(seq: u64) -> PushFrame {
3611 status_frame_with_session(seq, None)
3612 }
3613
3614 fn status_frame_with_session(seq: u64, session_id: Option<&str>) -> PushFrame {
3615 PushFrame::StatusChanged(StatusChangedFrame {
3616 frame_type: "status_changed",
3617 session_id: session_id.map(str::to_string),
3618 snapshot: json!({ "seq": seq }),
3619 })
3620 }
3621
3622 fn completion_frame(task_id: &str) -> PushFrame {
3623 completion_frame_with_session(task_id, "session-1")
3624 }
3625
3626 fn completion_frame_with_session(task_id: &str, session_id: &str) -> PushFrame {
3627 PushFrame::BashCompleted(BashCompletedFrame {
3628 frame_type: "bash_completed",
3629 task_id: task_id.to_string(),
3630 session_id: session_id.to_string(),
3631 status: BgTaskStatus::Completed,
3632 exit_code: Some(0),
3633 command: format!("echo {task_id}"),
3634 output_preview: String::new(),
3635 output_truncated: false,
3636 original_tokens: None,
3637 compressed_tokens: None,
3638 tokens_skipped: false,
3639 })
3640 }
3641
3642 fn long_running_frame(task_id: &str, elapsed_ms: u64) -> PushFrame {
3643 long_running_frame_with_session(task_id, "session-1", elapsed_ms)
3644 }
3645
3646 fn long_running_frame_with_session(
3647 task_id: &str,
3648 session_id: &str,
3649 elapsed_ms: u64,
3650 ) -> PushFrame {
3651 PushFrame::BashLongRunning(BashLongRunningFrame {
3652 frame_type: "bash_long_running",
3653 task_id: task_id.to_string(),
3654 session_id: session_id.to_string(),
3655 command: format!("sleep {elapsed_ms}"),
3656 elapsed_ms,
3657 })
3658 }
3659
3660 fn pattern_match_frame(session_id: &str) -> PushFrame {
3661 PushFrame::BashPatternMatch(BashPatternMatchFrame {
3662 frame_type: "bash_pattern_match",
3663 task_id: "task-pattern".to_string(),
3664 session_id: session_id.to_string(),
3665 watch_id: "watch-1".to_string(),
3666 match_text: "needle".to_string(),
3667 match_offset: 7,
3668 context: "haystack needle".to_string(),
3669 once: true,
3670 reason: "pattern_match",
3671 })
3672 }
3673
3674 fn configure_warnings_frame(session_id: Option<&str>) -> PushFrame {
3675 PushFrame::ConfigureWarnings(ConfigureWarningsFrame {
3676 frame_type: "configure_warnings",
3677 session_id: session_id.map(str::to_string),
3678 project_root: "/tmp/subc-test".to_string(),
3679 source_file_count: 0,
3680 warnings: Vec::new(),
3681 })
3682 }
3683
3684 fn route_identity(root: &ProjectRootId, session_id: &str) -> RouteIdentity {
3685 route_identity_with_trust(root, session_id, BindTrust::FirstParty)
3686 }
3687
3688 fn route_identity_with_trust(
3689 root: &ProjectRootId,
3690 session_id: &str,
3691 trust: BindTrust,
3692 ) -> RouteIdentity {
3693 RouteIdentity {
3694 root: root.clone(),
3695 project_root: root.as_path().to_path_buf(),
3696 harness: "opencode".to_string(),
3697 session: session_id.to_string(),
3698 trust,
3699 }
3700 }
3701
3702 fn progress_frame(request_id: &str, kind: ProgressKind, chunk: &str) -> PushFrame {
3703 PushFrame::Progress(ProgressFrame::new(request_id, kind, chunk))
3704 }
3705
3706 fn status_seq(frame: &PushFrame) -> Option<u64> {
3707 match frame {
3708 PushFrame::StatusChanged(status) => status.snapshot.get("seq").and_then(|v| v.as_u64()),
3709 _ => None,
3710 }
3711 }
3712
3713 fn completion_task(frame: &PushFrame) -> Option<&str> {
3714 match frame {
3715 PushFrame::BashCompleted(completion) => Some(completion.task_id.as_str()),
3716 _ => None,
3717 }
3718 }
3719
3720 fn push_frame_task_id(frame: &Frame) -> Option<String> {
3721 let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
3722 body.get("task_id")
3723 .and_then(serde_json::Value::as_str)
3724 .map(str::to_string)
3725 }
3726
3727 #[test]
3728 fn trust_for_principal_matrix() {
3729 assert_eq!(
3730 trust_for_principal(&Some(Principal::Direct)),
3731 BindTrust::FirstParty
3732 );
3733 assert_eq!(
3734 trust_for_principal(&Some(Principal::Reserved {
3735 module_id: "llm-runner".to_string(),
3736 })),
3737 BindTrust::FirstParty
3738 );
3739 assert_eq!(
3740 trust_for_principal(&Some(Principal::Reserved {
3741 module_id: "aft".to_string(),
3742 })),
3743 BindTrust::FirstParty
3744 );
3745 assert_eq!(
3746 trust_for_principal(&Some(Principal::Reserved {
3747 module_id: "subc-mcp".to_string(),
3748 })),
3749 BindTrust::Untrusted
3750 );
3751 assert_eq!(
3752 trust_for_principal(&Some(Principal::Reserved {
3753 module_id: "anything-unknown".to_string(),
3754 })),
3755 BindTrust::Untrusted
3756 );
3757 assert_eq!(
3758 trust_for_principal(&Some(Principal::Unverified)),
3759 BindTrust::Untrusted
3760 );
3761 assert_eq!(trust_for_principal(&None), BindTrust::Untrusted);
3762 }
3763
3764 #[test]
3765 fn frame_classification_matches_push_delivery_contract() {
3766 let completion = completion_frame_with_session("done", "session-a");
3767 assert_eq!(frame_session(&completion), Some("session-a"));
3768 assert!(frame_is_reliable(&completion));
3769
3770 let long_running = long_running_frame_with_session("long", "session-b", 42);
3771 assert_eq!(frame_session(&long_running), Some("session-b"));
3772 assert!(!frame_is_reliable(&long_running));
3773
3774 let pattern_match = pattern_match_frame("session-c");
3775 assert_eq!(frame_session(&pattern_match), Some("session-c"));
3776 assert!(frame_is_reliable(&pattern_match));
3777
3778 let tagged_warnings = configure_warnings_frame(Some("session-d"));
3779 assert_eq!(frame_session(&tagged_warnings), Some("session-d"));
3780 assert!(frame_is_reliable(&tagged_warnings));
3781
3782 let untagged_warnings = configure_warnings_frame(None);
3783 assert_eq!(frame_session(&untagged_warnings), None);
3784 assert!(frame_is_reliable(&untagged_warnings));
3785
3786 let tagged_status = status_frame_with_session(1, Some("session-e"));
3787 assert_eq!(frame_session(&tagged_status), Some("session-e"));
3788 assert!(!frame_is_reliable(&tagged_status));
3789
3790 let project_status = status_frame(2);
3791 assert_eq!(frame_session(&project_status), None);
3792 assert!(!frame_is_reliable(&project_status));
3793
3794 let progress = progress_frame("request-1", ProgressKind::Stdout, "chunk");
3795 assert_eq!(frame_session(&progress), None);
3796 assert!(!frame_is_reliable(&progress));
3797 }
3798
3799 #[test]
3800 fn fan_out_push_frame_routes_session_scoped_and_project_scoped_frames() {
3801 let (_root_dir, root) = test_root("subc-session-routing-root");
3802 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(8);
3803 let identity1 = route_identity(&root, "session-1");
3804 let identity2 = route_identity(&root, "session-2");
3805 let mut routes = HashMap::new();
3806 routes.insert(route_key(1), identity1.clone());
3807 routes.insert(route_key(2), identity2.clone());
3808 let mut root_channels = HashMap::new();
3809 root_channels.insert(root.clone(), HashSet::from([route_key(1), route_key(2)]));
3810 let mut session_identity = HashMap::new();
3811 remember_session_identity(&mut session_identity, &identity1);
3812 remember_session_identity(&mut session_identity, &identity2);
3813 let mut retry_buffer = HashMap::new();
3814 let mut push_buffer = HashMap::new();
3815
3816 let session_result = fan_out_reliable_push_frame(
3817 &writer_tx,
3818 &routes,
3819 &root_channels,
3820 &session_identity,
3821 &mut retry_buffer,
3822 &mut push_buffer,
3823 &root,
3824 &completion_frame_with_session("session-only", "session-1"),
3825 );
3826 assert_eq!(
3827 session_result,
3828 FanOutResult {
3829 matched_channels: 1,
3830 sent_frames: 1,
3831 }
3832 );
3833 assert!(retry_buffer.is_empty());
3834 assert!(push_buffer.is_empty());
3835 let session_push = writer_rx.try_recv().expect("session push queued");
3836 assert_eq!(session_push.header.ty, FrameType::Push);
3837 assert_eq!(session_push.header.channel, 1);
3838 assert!(
3839 writer_rx.try_recv().is_err(),
3840 "session-scoped frame must not broadcast to sibling sessions"
3841 );
3842
3843 let project_result =
3844 fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(9));
3845 assert_eq!(
3846 project_result,
3847 FanOutResult {
3848 matched_channels: 2,
3849 sent_frames: 2,
3850 }
3851 );
3852 let project_channels: HashSet<_> = [
3853 writer_rx
3854 .try_recv()
3855 .expect("first project push")
3856 .header
3857 .channel,
3858 writer_rx
3859 .try_recv()
3860 .expect("second project push")
3861 .header
3862 .channel,
3863 ]
3864 .into_iter()
3865 .collect();
3866 assert_eq!(project_channels, HashSet::from([1, 2]));
3867 assert!(writer_rx.try_recv().is_err());
3868 }
3869
3870 #[test]
3871 fn push_buffer_drops_oldest_per_replay_key() {
3872 let (_root_dir, root) = test_root("subc-buffer-bound-root");
3873 let key = ReplayKey {
3874 root,
3875 harness: "opencode".to_string(),
3876 session: "session-1".to_string(),
3877 };
3878 let mut push_buffer = HashMap::new();
3879 let total = PUSH_BUFFER_MAX_PER_KEY + 3;
3880
3881 for index in 0..total {
3882 buffer_push_frame(
3883 &mut push_buffer,
3884 key.clone(),
3885 completion_frame(&format!("task-{index}")),
3886 );
3887 }
3888
3889 let buffered = push_buffer.get(&key).expect("buffer entry");
3890 assert_eq!(buffered.len(), PUSH_BUFFER_MAX_PER_KEY);
3891 let tasks: Vec<String> = buffered
3892 .iter()
3893 .filter_map(completion_task)
3894 .map(str::to_string)
3895 .collect();
3896 assert_eq!(tasks.first().map(String::as_str), Some("task-3"));
3897 assert_eq!(
3898 tasks.last().map(String::as_str),
3899 Some(format!("task-{}", total - 1).as_str())
3900 );
3901 }
3902
3903 #[test]
3904 fn replay_buffered_push_frames_drains_to_bound_channel() {
3905 let (_root_dir, root) = test_root("subc-buffer-replay-root");
3906 let key = ReplayKey {
3907 root,
3908 harness: "opencode".to_string(),
3909 session: "session-1".to_string(),
3910 };
3911 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
3912 let mut push_buffer = HashMap::new();
3913 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
3914 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-b"));
3915
3916 let replayed = replay_buffered_push_frames(
3917 &writer_tx,
3918 route_key(3),
3919 &mut push_buffer,
3920 &key,
3921 BindTrust::FirstParty,
3922 );
3923
3924 assert_eq!(replayed, 2);
3925 assert!(!push_buffer.contains_key(&key));
3926 for expected_task in ["task-a", "task-b"] {
3927 let frame = writer_rx.try_recv().expect("replayed push");
3928 assert_eq!(frame.header.ty, FrameType::Push);
3929 assert_eq!(frame.header.channel, 3);
3930 let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
3931 assert_eq!(body["task_id"].as_str(), Some(expected_task));
3932 }
3933 assert!(writer_rx.try_recv().is_err());
3934 }
3935
3936 #[test]
3937 fn replay_buffered_push_frames_skips_bash_for_untrusted_route() {
3938 let (_root_dir, root) = test_root("subc-buffer-replay-untrusted-root");
3939 let key = ReplayKey {
3940 root,
3941 harness: "mcp".to_string(),
3942 session: "session-1".to_string(),
3943 };
3944 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
3945 let mut push_buffer = HashMap::new();
3946 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
3947
3948 let replayed = replay_buffered_push_frames(
3949 &writer_tx,
3950 route_key(3),
3951 &mut push_buffer,
3952 &key,
3953 BindTrust::Untrusted,
3954 );
3955
3956 assert_eq!(replayed, 0);
3957 assert!(!push_buffer.contains_key(&key));
3958 assert!(writer_rx.try_recv().is_err());
3959 }
3960
3961 #[test]
3962 fn coalesce_push_batch_collapses_lossy_and_preserves_reliable_fifo() {
3963 let (_root_dir, root) = test_root("subc-coalesce-root");
3964 let (_other_dir, other_root) = test_root("subc-coalesce-other");
3965
3966 let output = coalesce_push_batch(vec![
3967 (root.clone(), status_frame(1)),
3968 (root.clone(), completion_frame("task-1")),
3969 (root.clone(), status_frame(2)),
3970 (root.clone(), completion_frame("task-2")),
3971 (root.clone(), long_running_frame("long-task", 100)),
3972 (root.clone(), long_running_frame("long-task", 200)),
3973 (other_root.clone(), status_frame(9)),
3974 ]);
3975
3976 let completion_tasks: Vec<_> = output
3977 .iter()
3978 .filter_map(|(_, frame)| completion_task(frame))
3979 .collect();
3980 assert_eq!(completion_tasks, vec!["task-1", "task-2"]);
3981
3982 let root_statuses: Vec<_> = output
3983 .iter()
3984 .filter(|(output_root, _)| output_root == &root)
3985 .filter_map(|(_, frame)| status_seq(frame))
3986 .collect();
3987 assert_eq!(root_statuses, vec![2]);
3988
3989 let other_statuses: Vec<_> = output
3990 .iter()
3991 .filter(|(output_root, _)| output_root == &other_root)
3992 .filter_map(|(_, frame)| status_seq(frame))
3993 .collect();
3994 assert_eq!(other_statuses, vec![9]);
3995
3996 let long_running_elapsed: Vec<_> = output
3997 .iter()
3998 .filter_map(|(_, frame)| match frame {
3999 PushFrame::BashLongRunning(long_running) => Some(long_running.elapsed_ms),
4000 _ => None,
4001 })
4002 .collect();
4003 assert_eq!(long_running_elapsed, vec![200]);
4004 }
4005
4006 #[test]
4007 fn coalesce_push_batch_keeps_progress_stream_keys_separate() {
4008 let (_root_dir, root) = test_root("subc-progress-coalesce-root");
4009
4010 let output = coalesce_push_batch(vec![
4011 (
4012 root.clone(),
4013 progress_frame("request-1", ProgressKind::Stdout, "old stdout"),
4014 ),
4015 (
4016 root.clone(),
4017 progress_frame("request-1", ProgressKind::Stderr, "stderr"),
4018 ),
4019 (
4020 root.clone(),
4021 progress_frame("request-2", ProgressKind::Stdout, "other stdout"),
4022 ),
4023 (
4024 root.clone(),
4025 progress_frame("request-1", ProgressKind::Stdout, "new stdout"),
4026 ),
4027 ]);
4028
4029 let progress: Vec<_> = output
4030 .iter()
4031 .filter_map(|(_, frame)| match frame {
4032 PushFrame::Progress(progress) => Some((
4033 progress.request_id.as_str(),
4034 match progress.kind {
4035 ProgressKind::Stdout => "stdout",
4036 ProgressKind::Stderr => "stderr",
4037 },
4038 progress.chunk.as_str(),
4039 )),
4040 _ => None,
4041 })
4042 .collect();
4043
4044 assert_eq!(
4045 progress,
4046 vec![
4047 ("request-1", "stderr", "stderr"),
4048 ("request-2", "stdout", "other stdout"),
4049 ("request-1", "stdout", "new stdout"),
4050 ]
4051 );
4052 }
4053
4054 #[test]
4055 fn progress_sender_keeps_reliable_off_saturated_lossy_funnel_without_blocking() {
4056 let (_root_dir, root) = test_root("subc-push-full-root");
4057 let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1);
4058 let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
4059 let sender = progress_sender_for_root(
4060 PushSenders {
4061 lossy_tx,
4062 reliable_tx,
4063 },
4064 root.clone(),
4065 );
4066
4067 let started = Instant::now();
4068 sender(status_frame(1));
4069 sender(status_frame(2));
4070 sender(completion_frame("reliable-after-lossy-full"));
4071 assert!(
4072 started.elapsed() < Duration::from_millis(50),
4073 "saturated push sender must return immediately"
4074 );
4075
4076 let (received_root, received_frame) =
4077 lossy_rx.try_recv().expect("first lossy frame queued");
4078 assert_eq!(received_root, root);
4079 assert_eq!(status_seq(&received_frame), Some(1));
4080 assert!(
4081 lossy_rx.try_recv().is_err(),
4082 "second lossy frame should be dropped"
4083 );
4084
4085 let (reliable_root, reliable_frame) = reliable_rx
4086 .try_recv()
4087 .expect("reliable frame bypasses lossy backpressure");
4088 assert_eq!(reliable_root, root);
4089 assert_eq!(
4090 completion_task(&reliable_frame),
4091 Some("reliable-after-lossy-full")
4092 );
4093 assert!(reliable_rx.try_recv().is_err());
4094 }
4095
4096 #[test]
4097 fn fan_out_lossy_push_frame_drops_when_writer_is_full_without_blocking() {
4098 let (_root_dir, root) = test_root("subc-writer-full-root");
4099 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
4100 writer_tx
4101 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4102 .expect("prefill writer queue");
4103
4104 let mut root_channels = HashMap::new();
4105 root_channels.insert(root.clone(), HashSet::from([route_key(7)]));
4106
4107 let routes = HashMap::new();
4108 let started = Instant::now();
4109 let result =
4110 fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(1));
4111 assert!(
4112 started.elapsed() < Duration::from_millis(50),
4113 "saturated writer fan-out must return immediately"
4114 );
4115 assert_eq!(
4116 result,
4117 FanOutResult {
4118 matched_channels: 1,
4119 sent_frames: 0,
4120 }
4121 );
4122
4123 let queued = writer_rx
4124 .try_recv()
4125 .expect("prefilled frame remains queued");
4126 assert_eq!(queued.header.ty, FrameType::Ping);
4127 assert!(
4128 writer_rx.try_recv().is_err(),
4129 "push should be dropped on full writer"
4130 );
4131 }
4132
4133 #[test]
4134 fn reliable_push_backpressure_buffers_and_retries_on_tick() {
4135 let (_root_dir, root) = test_root("subc-retry-buffer-root");
4136 let identity = route_identity(&root, "session-1");
4137 let key = ReplayKey::from_identity(&identity);
4138 let mut routes = HashMap::new();
4139 routes.insert(route_key(9), identity.clone());
4140 let mut root_channels = HashMap::new();
4141 root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
4142 let mut session_identity = HashMap::new();
4143 remember_session_identity(&mut session_identity, &identity);
4144 let mut retry_buffer = HashMap::new();
4145 let mut push_buffer = HashMap::new();
4146 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
4147 writer_tx
4148 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4149 .expect("prefill writer queue");
4150
4151 let result = fan_out_reliable_push_frame(
4152 &writer_tx,
4153 &routes,
4154 &root_channels,
4155 &session_identity,
4156 &mut retry_buffer,
4157 &mut push_buffer,
4158 &root,
4159 &completion_frame("retry-task"),
4160 );
4161
4162 assert_eq!(
4163 result,
4164 FanOutResult {
4165 matched_channels: 1,
4166 sent_frames: 0,
4167 }
4168 );
4169 assert!(push_buffer.is_empty());
4170 assert_eq!(retry_buffer.get(&route_key(9)).map(VecDeque::len), Some(1));
4171 assert_eq!(&retry_buffer[&route_key(9)][0].0, &key);
4172
4173 let queued = writer_rx.try_recv().expect("prefilled frame");
4174 assert_eq!(queued.header.ty, FrameType::Ping);
4175 assert_eq!(
4176 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
4177 1
4178 );
4179 let retried = writer_rx.try_recv().expect("retried reliable push");
4180 assert_eq!(retried.header.ty, FrameType::Push);
4181 assert_eq!(retried.header.channel, 9);
4182 assert_eq!(push_frame_task_id(&retried).as_deref(), Some("retry-task"));
4183 assert!(!retry_buffer.contains_key(&route_key(9)));
4184 }
4185
4186 #[test]
4187 fn reliable_push_fifo_gates_new_frames_behind_retry_buffer() {
4188 let (_root_dir, root) = test_root("subc-retry-fifo-root");
4189 let identity = route_identity(&root, "session-1");
4190 let mut routes = HashMap::new();
4191 routes.insert(route_key(9), identity.clone());
4192 let mut root_channels = HashMap::new();
4193 root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
4194 let mut session_identity = HashMap::new();
4195 remember_session_identity(&mut session_identity, &identity);
4196 let mut retry_buffer = HashMap::new();
4197 let mut push_buffer = HashMap::new();
4198 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
4199 writer_tx
4200 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4201 .expect("prefill writer queue");
4202
4203 let first = completion_frame("fifo-1");
4204 let second = completion_frame("fifo-2");
4205 let _ = fan_out_reliable_push_frame(
4206 &writer_tx,
4207 &routes,
4208 &root_channels,
4209 &session_identity,
4210 &mut retry_buffer,
4211 &mut push_buffer,
4212 &root,
4213 &first,
4214 );
4215 let queued = writer_rx.try_recv().expect("free writer capacity");
4216 assert_eq!(queued.header.ty, FrameType::Ping);
4217
4218 let _ = fan_out_reliable_push_frame(
4219 &writer_tx,
4220 &routes,
4221 &root_channels,
4222 &session_identity,
4223 &mut retry_buffer,
4224 &mut push_buffer,
4225 &root,
4226 &second,
4227 );
4228 assert!(
4229 writer_rx.try_recv().is_err(),
4230 "second reliable frame must not bypass pending retry frame"
4231 );
4232 let queued_tasks: Vec<_> = retry_buffer[&route_key(9)]
4233 .iter()
4234 .filter_map(|(_, frame)| completion_task(frame))
4235 .collect();
4236 assert_eq!(queued_tasks, vec!["fifo-1", "fifo-2"]);
4237
4238 assert_eq!(
4239 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
4240 1
4241 );
4242 let first_sent = writer_rx.try_recv().expect("first reliable push");
4243 assert_eq!(push_frame_task_id(&first_sent).as_deref(), Some("fifo-1"));
4244 assert_eq!(
4245 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
4246 1
4247 );
4248 let second_sent = writer_rx.try_recv().expect("second reliable push");
4249 assert_eq!(push_frame_task_id(&second_sent).as_deref(), Some("fifo-2"));
4250 assert!(!retry_buffer.contains_key(&route_key(9)));
4251 }
4252
4253 #[test]
4254 fn replay_buffered_push_frames_drains_incrementally_on_backpressure() {
4255 let (_root_dir, root) = test_root("subc-incremental-replay-root");
4256 let key = ReplayKey {
4257 root,
4258 harness: "opencode".to_string(),
4259 session: "session-1".to_string(),
4260 };
4261 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(2);
4262 writer_tx
4263 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4264 .expect("prefill writer queue");
4265 let mut push_buffer = HashMap::new();
4266 for task in ["replay-1", "replay-2", "replay-3"] {
4267 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame(task));
4268 }
4269
4270 assert_eq!(
4271 replay_buffered_push_frames(
4272 &writer_tx,
4273 route_key(4),
4274 &mut push_buffer,
4275 &key,
4276 BindTrust::FirstParty
4277 ),
4278 1
4279 );
4280 assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(2));
4281 let remaining: Vec<_> = push_buffer[&key]
4282 .iter()
4283 .filter_map(completion_task)
4284 .collect();
4285 assert_eq!(remaining, vec!["replay-2", "replay-3"]);
4286
4287 let queued = writer_rx.try_recv().expect("prefilled frame");
4288 assert_eq!(queued.header.ty, FrameType::Ping);
4289 let first = writer_rx.try_recv().expect("first replayed push");
4290 assert_eq!(push_frame_task_id(&first).as_deref(), Some("replay-1"));
4291
4292 assert_eq!(
4293 replay_buffered_push_frames(
4294 &writer_tx,
4295 route_key(4),
4296 &mut push_buffer,
4297 &key,
4298 BindTrust::FirstParty
4299 ),
4300 2
4301 );
4302 let second = writer_rx.try_recv().expect("second replayed push");
4303 let third = writer_rx.try_recv().expect("third replayed push");
4304 assert_eq!(push_frame_task_id(&second).as_deref(), Some("replay-2"));
4305 assert_eq!(push_frame_task_id(&third).as_deref(), Some("replay-3"));
4306 assert!(!push_buffer.contains_key(&key));
4307 }
4308
4309 #[test]
4310 fn goodbye_migrates_retry_buffer_into_detach_replay() {
4311 let (_root_dir, root) = test_root("subc-goodbye-migration-root");
4312 let key = ReplayKey {
4313 root,
4314 harness: "opencode".to_string(),
4315 session: "session-1".to_string(),
4316 };
4317 let mut retry_buffer = HashMap::new();
4318 buffer_retry_frame(
4319 &mut retry_buffer,
4320 route_key(5),
4321 key.clone(),
4322 completion_frame("migrated-task"),
4323 );
4324 let mut push_buffer = HashMap::new();
4325
4326 assert_eq!(
4327 migrate_retry_buffer_to_push_buffer(&mut retry_buffer, route_key(5), &mut push_buffer),
4328 1
4329 );
4330
4331 assert!(!retry_buffer.contains_key(&route_key(5)));
4332 assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(1));
4333 assert_eq!(
4334 completion_task(&push_buffer[&key][0]),
4335 Some("migrated-task")
4336 );
4337 }
4338
4339 #[test]
4340 fn permanent_push_send_failure_is_dropped_not_retried_forever() {
4341 let (_root_dir, root) = test_root("subc-permanent-failure-root");
4342 let key = ReplayKey {
4343 root,
4344 harness: "opencode".to_string(),
4345 session: "session-1".to_string(),
4346 };
4347 let (writer_tx, writer_rx) = mpsc::channel::<Frame>(1);
4348 drop(writer_rx);
4349
4350 let mut push_buffer = HashMap::new();
4351 buffer_push_frame(
4352 &mut push_buffer,
4353 key.clone(),
4354 completion_frame("closed-replay"),
4355 );
4356 assert_eq!(
4357 replay_buffered_push_frames(
4358 &writer_tx,
4359 route_key(4),
4360 &mut push_buffer,
4361 &key,
4362 BindTrust::FirstParty
4363 ),
4364 0
4365 );
4366 assert!(!push_buffer.contains_key(&key));
4367
4368 let mut retry_buffer = HashMap::new();
4369 buffer_retry_frame(
4370 &mut retry_buffer,
4371 route_key(4),
4372 key,
4373 completion_frame("closed-retry"),
4374 );
4375 assert_eq!(
4376 drain_retry_buffer_for_channel(&writer_tx, route_key(4), &mut retry_buffer),
4377 0
4378 );
4379 assert!(!retry_buffer.contains_key(&route_key(4)));
4380 }
4381
4382 #[test]
4383 fn completed_task_suppresses_stale_long_running_lossy_push() {
4384 let mut completed_tasks = CompletedTaskIds::default();
4385 assert!(!should_drop_lossy_push(
4386 &completed_tasks,
4387 &long_running_frame("stale-task", 100)
4388 ));
4389
4390 completed_tasks.remember("stale-task");
4391
4392 assert!(should_drop_lossy_push(
4393 &completed_tasks,
4394 &long_running_frame("stale-task", 200)
4395 ));
4396 assert!(!should_drop_lossy_push(
4397 &completed_tasks,
4398 &long_running_frame("other-task", 200)
4399 ));
4400 }
4401
4402 #[test]
4403 fn arm_bg_wake_bumps_epoch_even_when_channel_is_already_pending() {
4404 let (_root_dir, root) = test_root("subc-bg-wake-epoch-root");
4405 let session = "session-1".to_string();
4406 let key = (root.clone(), session.clone());
4407 let channel = route_key(7);
4408 let mut bg_wake_pending = HashSet::from([channel]);
4409 let mut bg_wake_epoch = HashMap::from([(key.clone(), 41_u64)]);
4410
4411 arm_bg_wake(
4412 root,
4413 session,
4414 channel,
4415 &mut bg_wake_pending,
4416 &mut bg_wake_epoch,
4417 );
4418
4419 assert_eq!(bg_wake_pending, HashSet::from([channel]));
4420 assert_eq!(bg_wake_epoch.get(&key).copied(), Some(42));
4421 }
4422
4423 #[test]
4424 fn stale_maintenance_epoch_does_not_clear_newer_bg_wake() {
4425 let (_root_dir, root) = test_root("subc-bg-wake-stale-root");
4426 let session = "session-1".to_string();
4427 let key = (root.clone(), session.clone());
4428 let channel = route_key(8);
4429 let mut bg_sub_by_session = HashMap::new();
4430 bg_sub_by_session.insert(key.clone(), channel);
4431 let mut bg_wake_pending = HashSet::new();
4432 let mut bg_wake_epoch = HashMap::new();
4433
4434 arm_bg_wake(
4435 root.clone(),
4436 session.clone(),
4437 channel,
4438 &mut bg_wake_pending,
4439 &mut bg_wake_epoch,
4440 );
4441 let epoch_at_submit = bg_wake_epoch[&key];
4442 arm_bg_wake(
4443 root.clone(),
4444 session.clone(),
4445 channel,
4446 &mut bg_wake_pending,
4447 &mut bg_wake_epoch,
4448 );
4449
4450 clear_stale_bg_wakes_for_empty_sessions(
4451 &root,
4452 &[(session, epoch_at_submit)],
4453 &bg_sub_by_session,
4454 &mut bg_wake_pending,
4455 &bg_wake_epoch,
4456 );
4457
4458 assert!(bg_wake_pending.contains(&channel));
4459 assert_eq!(bg_wake_epoch.get(&key).copied(), Some(epoch_at_submit + 1));
4460 }
4461
4462 #[test]
4463 fn matching_maintenance_epoch_clears_genuinely_stale_bg_wake() {
4464 let (_root_dir, root) = test_root("subc-bg-wake-clear-root");
4465 let session = "session-1".to_string();
4466 let key = (root.clone(), session.clone());
4467 let channel = route_key(9);
4468 let mut bg_sub_by_session = HashMap::new();
4469 bg_sub_by_session.insert(key.clone(), channel);
4470 let mut bg_wake_pending = HashSet::new();
4471 let mut bg_wake_epoch = HashMap::new();
4472
4473 arm_bg_wake(
4474 root.clone(),
4475 session.clone(),
4476 channel,
4477 &mut bg_wake_pending,
4478 &mut bg_wake_epoch,
4479 );
4480 let epoch_at_submit = bg_wake_epoch[&key];
4481
4482 clear_stale_bg_wakes_for_empty_sessions(
4483 &root,
4484 &[(session, epoch_at_submit)],
4485 &bg_sub_by_session,
4486 &mut bg_wake_pending,
4487 &bg_wake_epoch,
4488 );
4489
4490 assert!(!bg_wake_pending.contains(&channel));
4491 }
4492
4493 #[test]
4494 fn response_is_fatal_panic_only_matches_panic_exclusive_code() {
4495 let tool_error = Response::error("request-1", "internal_error", "ordinary tool error");
4496 let panic_error = Response::error("request-2", "actor_fatal", "mutating panic");
4497
4498 assert!(!response_is_fatal_panic(&tool_error));
4499 assert!(response_is_fatal_panic(&panic_error));
4500 }
4501
4502 #[tokio::test]
4503 async fn persistent_cancel_resolves_when_fired_before_await() {
4504 let signal = PersistentCancelSignal::new();
4508 signal.cancel();
4509 tokio::time::timeout(Duration::from_secs(1), signal.cancelled())
4511 .await
4512 .expect("cancelled() must resolve when cancel fired beforehand");
4513
4514 let racing = PersistentCancelSignal::new();
4516 let racing_for_task = racing.clone();
4517 let waiter = tokio::spawn(async move { racing_for_task.cancelled().await });
4518 racing.cancel();
4519 tokio::time::timeout(Duration::from_secs(1), waiter)
4520 .await
4521 .expect("cancelled() must resolve when cancel races the await")
4522 .expect("waiter task panicked");
4523 }
4524
4525 #[tokio::test]
4526 async fn control_send_times_out_when_writer_queue_remains_full() {
4527 let (writer_tx, _writer_rx) = mpsc::channel::<Frame>(1);
4528 writer_tx
4529 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4530 .expect("prefill writer queue");
4531 let started = Instant::now();
4532
4533 let result = send_frame(
4534 &writer_tx,
4535 Frame::build(FrameType::Pong, control_flags(), 0, 2, Vec::new()).unwrap(),
4536 )
4537 .await;
4538
4539 assert!(matches!(result, Err(SubcError::WriterBackpressureTimeout)));
4540 assert!(
4541 started.elapsed() < Duration::from_secs(2),
4542 "control send guard should be bounded"
4543 );
4544 }
4545
4546 const CORE_TOOLS: [&str; 21] = [
4547 "status",
4548 "bash",
4549 "read",
4550 "write",
4551 "edit",
4552 "apply_patch",
4553 "grep",
4554 "glob",
4555 "search",
4556 "outline",
4557 "zoom",
4558 "inspect",
4559 "callgraph",
4560 "conflicts",
4561 "ast_search",
4562 "ast_replace",
4563 "delete",
4564 "move",
4565 "import",
4566 "refactor",
4567 "safety",
4568 ];
4569
4570 fn is_bare_placeholder_schema(schema: &Value) -> bool {
4571 schema == &json!({ "type": "object" })
4572 }
4573
4574 #[test]
4575 fn build_manifest_serves_embedded_tool_schemas() {
4576 let manifest = build_manifest();
4577 let tools = match manifest.provides.first() {
4578 Some(ProviderRole::ToolProvider { tools, .. }) => tools,
4579 _ => panic!("expected ToolProvider"),
4580 };
4581 let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
4582 for name in CORE_TOOLS {
4583 let tool = by_name
4584 .get(name)
4585 .unwrap_or_else(|| panic!("missing tool {name}"));
4586 assert!(
4587 !is_bare_placeholder_schema(&tool.schema),
4588 "{name} must not use bare placeholder schema"
4589 );
4590 assert_eq!(
4591 tool.schema.get("type").and_then(|v| v.as_str()),
4592 Some("object"),
4593 "{name} schema must be an object"
4594 );
4595 }
4596
4597 let read = by_name["read"]
4598 .schema
4599 .get("properties")
4600 .and_then(|p| p.as_object());
4601 let read_props = read.expect("read schema properties");
4602 assert!(
4603 read_props.contains_key("filePath"),
4604 "read schema must expose filePath"
4605 );
4606
4607 let status = &by_name["status"].schema;
4608 assert_eq!(
4609 status.get("properties").and_then(|v| v.as_object()),
4610 Some(&serde_json::Map::new()),
4611 "status schema must have empty properties"
4612 );
4613 assert_eq!(
4614 status.get("additionalProperties").and_then(|v| v.as_bool()),
4615 Some(false),
4616 "status schema must forbid additionalProperties"
4617 );
4618 }
4619
4620 #[test]
4621 fn build_manifest_classifies_execution_mode_by_observable_effect() {
4622 let manifest = build_manifest();
4623 let tools = match manifest.provides.first() {
4624 Some(ProviderRole::ToolProvider { tools, .. }) => tools,
4625 _ => panic!("expected ToolProvider"),
4626 };
4627 let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
4628
4629 for name in [
4632 "status",
4633 "read",
4634 "grep",
4635 "glob",
4636 "search",
4637 "outline",
4638 "zoom",
4639 "inspect",
4640 "callgraph",
4641 "conflicts",
4642 "ast_search",
4643 ] {
4644 assert_eq!(
4645 by_name[name].execution_mode,
4646 ExecutionMode::Pure,
4647 "{name} produces no observable side effect and must be Pure"
4648 );
4649 }
4650 for name in [
4652 "bash",
4653 "write",
4654 "edit",
4655 "apply_patch",
4656 "ast_replace",
4657 "delete",
4658 "move",
4659 "import",
4660 "refactor",
4661 "safety",
4662 ] {
4663 assert_eq!(
4664 by_name[name].execution_mode,
4665 ExecutionMode::Mutating,
4666 "{name} writes files and must be Mutating"
4667 );
4668 }
4669 }
4670
4671 #[test]
4672 fn subc_agent_lanes_classify_new_read_tools() {
4673 assert_eq!(command_lane("callgraph"), Lane::HeavyInit);
4674 assert_eq!(command_lane("conflicts"), Lane::PureRead);
4675 }
4676
4677 #[test]
4678 fn native_plumbing_allowlist_admits_exactly_drain_and_ack() {
4679 assert!(is_subc_native_plumbing_tool("bash_drain_completions"));
4684 assert!(is_subc_native_plumbing_tool("bash_ack_completions"));
4685
4686 assert!(!is_subc_native_plumbing_tool("configure"));
4689 assert!(!is_subc_native_plumbing_tool("bash"));
4690 assert!(!is_subc_native_plumbing_tool("bash_kill"));
4691 assert!(!is_subc_native_plumbing_tool("db_set_state"));
4692 assert!(!is_subc_native_plumbing_tool("undo"));
4693
4694 assert!(!is_subc_agent_core_tool("bash_drain_completions"));
4697 assert!(!is_subc_agent_core_tool("bash_ack_completions"));
4698
4699 assert_eq!(command_lane("bash_drain_completions"), Lane::PureRead);
4701 assert_eq!(command_lane("bash_ack_completions"), Lane::Mutating);
4702 }
4703
4704 #[test]
4705 fn tool_response_frame_carries_flat_standalone_shape_in_structured_content() {
4706 use crate::protocol::Response;
4707
4708 let response = Response::success(
4711 "req-7",
4712 json!({
4713 "complete": true,
4714 "matches": 3,
4715 "status_bar": { "errors": 0, "warnings": 1 },
4716 "bg_completions": [{ "task_id": "bash-abc" }],
4717 }),
4718 );
4719 let result = ToolCallResult {
4720 text: "rendered text".to_string(),
4721 response,
4722 };
4723
4724 let expected_flat = json!({
4728 "id": "req-7",
4729 "success": true,
4730 "complete": true,
4731 "matches": 3,
4732 "status_bar": { "errors": 0, "warnings": 1 },
4733 "bg_completions": [{ "task_id": "bash-abc" }],
4734 "text": "rendered text",
4735 });
4736 assert_eq!(
4737 flat_tool_response(&result.response, &result.text),
4738 expected_flat,
4739 "structuredContent must be byte-identical to the standalone flat response"
4740 );
4741
4742 let frame =
4745 build_tool_response_frame(PROTOCOL_VERSION, 1, 42, control_flags(), &result).unwrap();
4746 let body: Value = serde_json::from_slice(&frame.body).unwrap();
4747 assert_eq!(body["isError"], json!(false));
4748 assert_eq!(body["content"][0]["type"], json!("text"));
4749 assert_eq!(body["content"][0]["text"], json!("rendered text"));
4750 assert_eq!(body["structuredContent"], expected_flat);
4751
4752 let err = Response::error_with_data(
4755 "req-8",
4756 "ambiguous_match",
4757 "too many matches",
4758 json!({ "candidates": ["a", "b"] }),
4759 );
4760 let err_result = ToolCallResult {
4761 text: "error text".to_string(),
4762 response: err,
4763 };
4764 let err_frame =
4765 build_tool_response_frame(PROTOCOL_VERSION, 1, 43, control_flags(), &err_result)
4766 .unwrap();
4767 let err_body: Value = serde_json::from_slice(&err_frame.body).unwrap();
4768 assert_eq!(err_body["isError"], json!(true));
4769 assert_eq!(err_body["structuredContent"]["success"], json!(false));
4770 assert_eq!(
4771 err_body["structuredContent"]["code"],
4772 json!("ambiguous_match")
4773 );
4774 assert_eq!(
4775 err_body["structuredContent"]["candidates"],
4776 json!(["a", "b"])
4777 );
4778 assert_eq!(err_body["structuredContent"]["text"], json!("error text"));
4779 }
4780}
4781
4782#[derive(Debug)]
4783pub enum SubcError {
4784 Runtime(std::io::Error),
4785 ConnectionFile {
4786 path: PathBuf,
4787 source: subc_transport::ConnectionFileError,
4788 },
4789 NoEndpoint {
4790 path: PathBuf,
4791 },
4792 InvalidEndpoint {
4793 path: PathBuf,
4794 endpoint: String,
4795 },
4796 Connect {
4797 endpoint: String,
4798 source: std::io::Error,
4799 },
4800 Auth {
4801 endpoint: String,
4802 source: subc_transport::AuthError,
4803 },
4804 FrameIo(subc_transport::FrameIoError),
4805 FrameBuild(subc_protocol::FrameBuildError),
4806 WriterClosed,
4807 WriterBackpressureTimeout,
4808 WriterJoin(tokio::task::JoinError),
4809 Json(serde_json::Error),
4810 ClosedBeforeHelloAck,
4811 HelloRejected {
4812 body: Option<ErrorBody>,
4813 },
4814 UnexpectedFrame {
4815 ty: FrameType,
4816 },
4817}
4818
4819impl fmt::Display for SubcError {
4820 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4821 match self {
4822 Self::Runtime(e) => write!(f, "failed to build subc tokio runtime: {e}"),
4823 Self::ConnectionFile { path, source } => {
4824 write!(f, "failed to read subc connection file {path:?}: {source}")
4825 }
4826 Self::NoEndpoint { path } => {
4827 write!(f, "subc connection file {path:?} has no endpoints")
4828 }
4829 Self::InvalidEndpoint { path, endpoint } => {
4830 write!(
4831 f,
4832 "subc connection file {path:?} has invalid endpoint {endpoint}"
4833 )
4834 }
4835 Self::Connect { endpoint, source } => {
4836 write!(f, "failed to connect to subc endpoint {endpoint}: {source}")
4837 }
4838 Self::Auth { endpoint, source } => {
4839 write!(
4840 f,
4841 "failed to authenticate to subc endpoint {endpoint}: {source}"
4842 )
4843 }
4844 Self::FrameIo(e) => write!(f, "subc frame I/O error: {e}"),
4845 Self::FrameBuild(e) => write!(f, "subc frame build error: {e}"),
4846 Self::WriterClosed => write!(f, "subc writer task closed"),
4847 Self::WriterBackpressureTimeout => write!(
4848 f,
4849 "subc writer task stayed backpressured while sending a control frame"
4850 ),
4851 Self::WriterJoin(e) => write!(f, "subc writer task join error: {e}"),
4852 Self::Json(e) => write!(f, "subc JSON error: {e}"),
4853 Self::ClosedBeforeHelloAck => {
4854 write!(f, "subc daemon closed the connection before HelloAck")
4855 }
4856 Self::HelloRejected { body } => match body {
4857 Some(b) => write!(f, "subc rejected ModuleHello: {} ({})", b.code, b.message),
4858 None => write!(f, "subc rejected ModuleHello (unparseable error body)"),
4859 },
4860 Self::UnexpectedFrame { ty } => {
4861 write!(f, "subc sent unexpected frame in place of HelloAck: {ty:?}")
4862 }
4863 }
4864 }
4865}
4866
4867impl std::error::Error for SubcError {}