Skip to main content

aft/
subc.rs

1//! subc daemon attach — transport edge.
2//!
3//! When AFT is launched as `aft --subc <connection-file>`, it does NOT run the
4//! standalone NDJSON-over-stdin loop. Instead it connects to a running subc
5//! daemon over loopback TCP, authenticates with the pre-envelope HMAC handshake
6//! (`subc-transport`), then speaks the subc frame protocol (`subc-protocol`):
7//! ModuleHello → HelloAck (register as a tool provider), then a channel-0
8//! control loop (Ping/Pong, RouteBind) plus route-channel tool calls.
9//!
10//! Concurrency: subc routes tool calls through the executor. The tokio
11//! edge never dispatches against `AppContext` inline; per-actor executor lanes
12//! own the reader/mutator epoch, while a writer task serializes outbound frames.
13
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::fmt;
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, LazyLock};
20use std::time::{Duration, Instant};
21
22use serde::Deserialize;
23use serde_json::{json, Value};
24
25use crate::config::Config;
26use crate::config_resolve::ConfigTier;
27use crate::context::{App, AppContext, ProgressSender};
28use crate::executor::{Executor, Lane};
29use crate::jsonc::strip_jsonc;
30use crate::log_ctx;
31use crate::path_identity::ProjectRootId;
32use crate::protocol::{ProgressKind, PushFrame, RawRequest, Response};
33use crate::run_tool_call::{run_tool_call, ToolCallContext, ToolCallOutcome, ToolCallResult};
34use crate::runtime_drain;
35
36use subc_protocol::manifest::{
37    Bindings, Concurrency, ExecutionMode, IdentityBinding, IdentityScope, ModuleManifest,
38    ProviderRole, StorageBinding, StorageKind, StorageScope, Tool, TrustTier,
39};
40use subc_protocol::session::{ModuleControlRequest, ModuleControlResponse};
41use subc_protocol::{
42    ErrorBody, Flags, Frame, FrameType, ModuleHelloBody, Principal, Priority, PROTOCOL_VERSION,
43};
44use subc_transport::{authenticate_client, connection_file, read_frame, write_frame};
45use tokio::io::{AsyncRead, AsyncWrite};
46use tokio::net::TcpStream;
47use tokio::sync::{mpsc, oneshot, Notify};
48use tokio::task::JoinHandle;
49
50/// Handshake budget. subc binds-before-spawn, so a reachable daemon authenticates
51/// well within this; an unreachable/socket-stale daemon fails loud rather than
52/// silently downgrading to standalone (the --subc contract).
53const AUTH_DEADLINE: Duration = Duration::from_secs(5);
54
55/// Correlation id for the initial ModuleHello (channel 0).
56const HELLO_CORR: u64 = 1;
57
58/// Per-session in-memory replay cap for must-deliver Push frames. This covers
59/// detach/re-attach while AFT stays alive; cross-restart replay is phased later.
60const PUSH_BUFFER_MAX_PER_KEY: usize = 256;
61
62/// Bounded guard for control-frame sends. If the daemon stops reading and the
63/// writer queue stays full, tear the subc edge down instead of stalling the
64/// route loop indefinitely.
65const CONTROL_SEND_TIMEOUT: Duration = Duration::from_millis(250);
66
67/// Small bounded memory of completed task ids used to suppress stale lossy
68/// long-running reminders that arrive after their reliable completion event.
69const COMPLETED_TASK_SUPPRESSION_MAX: usize = 4096;
70
71/// Bash foreground orchestration polls detached tasks with short read-lane jobs.
72/// The sleep between polls is outside the executor so no read or write worker is
73/// pinned while a foreground command is still running.
74const PENDING_POLL_INTERVAL: Duration = Duration::from_millis(100);
75
76type RouteChannel = u32;
77type PushEnvelope = (ProjectRootId, PushFrame);
78type RetryBuffer = HashMap<RouteChannel, VecDeque<(ReplayKey, PushFrame)>>;
79
80#[derive(Clone)]
81struct PushSenders {
82    lossy_tx: mpsc::Sender<PushEnvelope>,
83    reliable_tx: mpsc::UnboundedSender<PushEnvelope>,
84}
85
86#[derive(Clone)]
87struct PersistentCancelSignal {
88    inner: Arc<PersistentCancelInner>,
89}
90
91struct PersistentCancelInner {
92    cancelled: AtomicBool,
93    notify: Notify,
94}
95
96impl PersistentCancelSignal {
97    fn new() -> Self {
98        Self {
99            inner: Arc::new(PersistentCancelInner {
100                cancelled: AtomicBool::new(false),
101                notify: Notify::new(),
102            }),
103        }
104    }
105
106    fn cancel(&self) {
107        if !self.inner.cancelled.swap(true, Ordering::SeqCst) {
108            self.inner.notify.notify_waiters();
109        }
110    }
111
112    fn is_cancelled(&self) -> bool {
113        self.inner.cancelled.load(Ordering::SeqCst)
114    }
115
116    async fn cancelled(&self) {
117        // `enable()` REGISTERS this waiter before we read the flag, closing the
118        // lost-wakeup window: `notify_waiters()` only wakes already-registered
119        // waiters and stores no permit, so without enable() a `cancel()` firing
120        // between the flag read and `.await` would be missed and the future
121        // would park forever (cancel() fires only once). With enable(), a cancel
122        // racing the flag read still wakes the registered waiter. The loop is a
123        // belt-and-suspenders re-check on spurious wakeups.
124        loop {
125            let notified = self.inner.notify.notified();
126            tokio::pin!(notified);
127            notified.as_mut().enable();
128            if self.is_cancelled() {
129                return;
130            }
131            notified.await;
132        }
133    }
134}
135
136#[derive(Clone)]
137struct BashWaitCancel {
138    connection: PersistentCancelSignal,
139    route: PersistentCancelSignal,
140}
141
142impl BashWaitCancel {
143    async fn cancelled(&self) {
144        tokio::select! {
145            _ = self.connection.cancelled() => {}
146            _ = self.route.cancelled() => {}
147        }
148    }
149}
150
151struct RouteBashCancel {
152    token: PersistentCancelSignal,
153    active_waits: usize,
154}
155
156struct BashDeferredCompletion {
157    channel: u16,
158    corr: u64,
159    flags: Flags,
160    ver: u8,
161    root: ProjectRootId,
162    request_id: String,
163    result: Option<ToolCallResult>,
164    fatal: bool,
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub(crate) enum BindTrust {
169    FirstParty,
170    Untrusted,
171}
172
173impl BindTrust {
174    fn allows_bash_observation(self) -> bool {
175        matches!(self, Self::FirstParty)
176    }
177
178    fn label(self) -> &'static str {
179        match self {
180            Self::FirstParty => "first_party",
181            Self::Untrusted => "untrusted",
182        }
183    }
184}
185
186pub(crate) fn trust_for_principal(principal: &Option<Principal>) -> BindTrust {
187    match principal {
188        Some(Principal::Direct) => BindTrust::FirstParty,
189        Some(Principal::Reserved { module_id })
190            if module_id == "llm-runner" || module_id == "aft" =>
191        {
192            BindTrust::FirstParty
193        }
194        Some(Principal::Reserved { .. }) | Some(Principal::Unverified) | None => {
195            BindTrust::Untrusted
196        }
197    }
198}
199
200fn principal_label(principal: &Option<Principal>) -> String {
201    match principal {
202        Some(Principal::Direct) => "direct".to_string(),
203        Some(Principal::Reserved { module_id }) => format!("reserved:{module_id}"),
204        Some(Principal::Unverified) => "unverified".to_string(),
205        None => "absent".to_string(),
206    }
207}
208
209#[derive(Debug)]
210/// Per-root route metadata owned by the subc loop. The `active_bash_waits` field
211/// counts detached bash processes that are still being observed for this root.
212/// Any future logic that evicts roots based on idle time must not evict a root
213/// while this count is greater than zero, because a foreground bash response may
214/// still arrive later.
215struct RootMeta {
216    maintenance_pending: bool,
217    last_touched: Instant,
218    diagnostics_on_edit: bool,
219    active_bash_waits: usize,
220}
221
222#[derive(Debug)]
223struct PendingBind {
224    bind_root_id: ProjectRootId,
225    inserted_new_actor: bool,
226    cancelled: bool,
227}
228
229struct RouteBindCompletion {
230    route_channel: u16,
231    identity: RouteIdentity,
232    bind_root_id: ProjectRootId,
233    inserted_new_actor: bool,
234    configure_response: Response,
235    drain_response: Option<Response>,
236    diagnostics_on_edit: bool,
237    ver: u8,
238    corr: u64,
239    flags: Flags,
240}
241
242#[derive(Debug, Clone)]
243struct RouteIdentity {
244    root: ProjectRootId,
245    project_root: PathBuf,
246    harness: String,
247    session: String,
248    trust: BindTrust,
249}
250
251#[derive(Debug, Clone)]
252struct RetainedSessionIdentity {
253    harness: String,
254    trust: BindTrust,
255}
256
257#[derive(Clone, Copy)]
258struct BgSub {
259    corr: u64,
260    ver: u8,
261    flags: Flags,
262}
263
264struct MaintenanceCompletion {
265    root_id: ProjectRootId,
266    response: Response,
267    empty_bg_sessions: Vec<(String, u64)>,
268}
269
270#[derive(Debug, Clone, PartialEq, Eq, Hash)]
271struct ReplayKey {
272    root: ProjectRootId,
273    harness: String,
274    session: String,
275}
276
277impl ReplayKey {
278    fn from_identity(identity: &RouteIdentity) -> Self {
279        Self {
280            root: identity.root.clone(),
281            harness: identity.harness.clone(),
282            session: identity.session.clone(),
283        }
284    }
285}
286
287#[derive(Debug, Default)]
288struct CompletedTaskIds {
289    order: VecDeque<String>,
290    set: HashSet<String>,
291}
292
293impl CompletedTaskIds {
294    fn remember(&mut self, task_id: &str) {
295        if self.set.contains(task_id) {
296            return;
297        }
298        if self.order.len() >= COMPLETED_TASK_SUPPRESSION_MAX {
299            if let Some(evicted) = self.order.pop_front() {
300                self.set.remove(&evicted);
301            }
302        }
303        let task_id = task_id.to_string();
304        self.order.push_back(task_id.clone());
305        self.set.insert(task_id);
306    }
307
308    fn contains(&self, task_id: &str) -> bool {
309        self.set.contains(task_id)
310    }
311}
312
313impl RootMeta {
314    fn new(now: Instant) -> Self {
315        Self {
316            maintenance_pending: false,
317            last_touched: now,
318            diagnostics_on_edit: false,
319            active_bash_waits: 0,
320        }
321    }
322
323    fn touch(&mut self) {
324        self.last_touched = Instant::now();
325    }
326}
327
328fn route_key(channel: u16) -> RouteChannel {
329    RouteChannel::from(channel)
330}
331
332fn remove_root_channel(
333    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
334    root: &ProjectRootId,
335    channel: RouteChannel,
336) {
337    let remove_root = if let Some(channels) = root_channels.get_mut(root) {
338        channels.remove(&channel);
339        channels.is_empty()
340    } else {
341        false
342    };
343    if remove_root {
344        root_channels.remove(root);
345    }
346}
347
348fn remove_route_channel(
349    routes: &mut HashMap<RouteChannel, RouteIdentity>,
350    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
351    channel: RouteChannel,
352) -> Option<RouteIdentity> {
353    let removed = routes.remove(&channel);
354    if let Some(identity) = &removed {
355        remove_root_channel(root_channels, &identity.root, channel);
356    }
357    removed
358}
359
360fn insert_route_channel(
361    routes: &mut HashMap<RouteChannel, RouteIdentity>,
362    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
363    channel: RouteChannel,
364    identity: RouteIdentity,
365) {
366    if let Some(previous) = routes.insert(channel, identity.clone()) {
367        remove_root_channel(root_channels, &previous.root, channel);
368    }
369    root_channels
370        .entry(identity.root.clone())
371        .or_default()
372        .insert(channel);
373}
374
375fn remove_bg_subscription_index(
376    bg_sub_by_session: &mut HashMap<(ProjectRootId, String), RouteChannel>,
377    channel: RouteChannel,
378    identity: Option<&RouteIdentity>,
379) {
380    if let Some(identity) = identity {
381        let key = (identity.root.clone(), identity.session.clone());
382        if bg_sub_by_session.get(&key).copied() == Some(channel) {
383            bg_sub_by_session.remove(&key);
384        }
385    } else {
386        bg_sub_by_session.retain(|_, mapped_channel| *mapped_channel != channel);
387    }
388}
389
390fn end_bg_subscription(
391    writer_tx: &mpsc::Sender<Frame>,
392    bg_subs: &mut HashMap<RouteChannel, BgSub>,
393    bg_sub_by_session: &mut HashMap<(ProjectRootId, String), RouteChannel>,
394    bg_wake_pending: &mut HashSet<RouteChannel>,
395    channel: RouteChannel,
396    identity: Option<&RouteIdentity>,
397) {
398    if let Some(sub) = bg_subs.get(&channel).copied() {
399        let _ = try_send_bg_stream_end(writer_tx, channel, &sub);
400        bg_subs.remove(&channel);
401        bg_wake_pending.remove(&channel);
402        remove_bg_subscription_index(bg_sub_by_session, channel, identity);
403    }
404}
405
406fn remember_session_identity(
407    session_identity: &mut HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
408    identity: &RouteIdentity,
409) {
410    let key = (identity.root.clone(), identity.session.clone());
411    if matches!(identity.trust, BindTrust::Untrusted)
412        && session_identity
413            .get(&key)
414            .is_some_and(|retained| matches!(retained.trust, BindTrust::FirstParty))
415    {
416        return;
417    }
418
419    // Retained after route Goodbye so reliable session-scoped frames emitted while
420    // the session is detached can still be keyed by the full (root,harness,session)
421    // replay triple. Untrusted binds never overwrite a retained first-party
422    // session identity, because bash completion replay is an observation channel.
423    session_identity.insert(
424        key,
425        RetainedSessionIdentity {
426            harness: identity.harness.clone(),
427            trust: identity.trust,
428        },
429    );
430}
431
432fn replay_key_for_session(
433    session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
434    root: &ProjectRootId,
435    session: &str,
436) -> Option<(ReplayKey, BindTrust)> {
437    let retained = session_identity.get(&(root.clone(), session.to_string()))?;
438    Some((
439        ReplayKey {
440            root: root.clone(),
441            harness: retained.harness.clone(),
442            session: session.to_string(),
443        },
444        retained.trust,
445    ))
446}
447
448fn frame_session(frame: &PushFrame) -> Option<&str> {
449    match frame {
450        PushFrame::BashCompleted(completed) => Some(completed.session_id.as_str()),
451        PushFrame::BashLongRunning(long_running) => Some(long_running.session_id.as_str()),
452        PushFrame::BashPatternMatch(pattern_match) => Some(pattern_match.session_id.as_str()),
453        PushFrame::ConfigureWarnings(warnings) => warnings.session_id.as_deref(),
454        PushFrame::StatusChanged(status) => status.session_id.as_deref(),
455        PushFrame::Progress(_) => None,
456    }
457}
458
459fn frame_is_reliable(frame: &PushFrame) -> bool {
460    matches!(
461        frame,
462        PushFrame::BashCompleted(_)
463            | PushFrame::BashPatternMatch(_)
464            | PushFrame::ConfigureWarnings(_)
465    )
466}
467
468fn frame_is_bash_observation(frame: &PushFrame) -> bool {
469    matches!(
470        frame,
471        PushFrame::BashCompleted(_)
472            | PushFrame::BashLongRunning(_)
473            | PushFrame::BashPatternMatch(_)
474    )
475}
476
477fn completed_task_id(frame: &PushFrame) -> Option<&str> {
478    match frame {
479        PushFrame::BashCompleted(completed) => Some(completed.task_id.as_str()),
480        _ => None,
481    }
482}
483
484fn completed_bg_session_key(
485    root: &ProjectRootId,
486    frame: &PushFrame,
487) -> Option<(ProjectRootId, String)> {
488    match frame {
489        PushFrame::BashCompleted(completed) => Some((root.clone(), completed.session_id.clone())),
490        _ => None,
491    }
492}
493
494fn long_running_task_id(frame: &PushFrame) -> Option<&str> {
495    match frame {
496        PushFrame::BashLongRunning(long_running) => Some(long_running.task_id.as_str()),
497        _ => None,
498    }
499}
500
501fn should_drop_lossy_push(completed_tasks: &CompletedTaskIds, frame: &PushFrame) -> bool {
502    long_running_task_id(frame).is_some_and(|task_id| completed_tasks.contains(task_id))
503}
504
505fn progress_sender_for_root(push_senders: PushSenders, root_id: ProjectRootId) -> ProgressSender {
506    Arc::new(Box::new(move |frame: PushFrame| {
507        // Emitters can run on executor workers, maintenance jobs, watcher drains,
508        // semantic refresh workers, or bg-bash watchdog threads. Never block any
509        // of them on subc routing/backpressure: reliable frames take an
510        // unbounded non-blocking lane; lossy frames stay bounded and coalesced.
511        if frame_is_reliable(&frame) {
512            let _ = push_senders.reliable_tx.send((root_id.clone(), frame));
513        } else {
514            let _ = push_senders.lossy_tx.try_send((root_id.clone(), frame));
515        }
516    }))
517}
518
519#[derive(Debug, Clone, PartialEq, Eq, Hash)]
520enum LossyProgressKind {
521    Stdout,
522    Stderr,
523}
524
525impl From<&ProgressKind> for LossyProgressKind {
526    fn from(kind: &ProgressKind) -> Self {
527        match kind {
528            ProgressKind::Stdout => Self::Stdout,
529            ProgressKind::Stderr => Self::Stderr,
530        }
531    }
532}
533
534#[derive(Debug, Clone, PartialEq, Eq, Hash)]
535enum LossyPushKey {
536    Progress {
537        request_id: String,
538        kind: LossyProgressKind,
539    },
540    StatusChanged,
541    BashLongRunning {
542        task_id: String,
543    },
544}
545
546fn lossy_push_key(frame: &PushFrame) -> Option<LossyPushKey> {
547    match frame {
548        PushFrame::Progress(progress) => Some(LossyPushKey::Progress {
549            request_id: progress.request_id.clone(),
550            kind: LossyProgressKind::from(&progress.kind),
551        }),
552        PushFrame::StatusChanged(_) => Some(LossyPushKey::StatusChanged),
553        PushFrame::BashLongRunning(long_running) => Some(LossyPushKey::BashLongRunning {
554            task_id: long_running.task_id.clone(),
555        }),
556        PushFrame::BashCompleted(_)
557        | PushFrame::BashPatternMatch(_)
558        | PushFrame::ConfigureWarnings(_) => None,
559    }
560}
561
562fn coalesce_push_batch(batch: Vec<(ProjectRootId, PushFrame)>) -> Vec<(ProjectRootId, PushFrame)> {
563    let mut slots: Vec<Option<(ProjectRootId, PushFrame)>> = Vec::with_capacity(batch.len());
564    let mut latest_lossy: HashMap<(ProjectRootId, LossyPushKey), usize> = HashMap::new();
565
566    for (root, frame) in batch {
567        if let Some(lossy_key) = lossy_push_key(&frame) {
568            let map_key = (root.clone(), lossy_key);
569            if let Some(previous_index) = latest_lossy.insert(map_key, slots.len()) {
570                slots[previous_index] = None;
571            }
572        }
573        slots.push(Some((root, frame)));
574    }
575
576    slots.into_iter().flatten().collect()
577}
578
579#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
580struct FanOutResult {
581    /// Channels matching the frame's project/session scope. Reliable Push frames
582    /// that match a channel but hit writer backpressure are held in retry_buffer
583    /// instead of being mistaken for detach replay.
584    matched_channels: usize,
585    /// Frames accepted by the writer queue immediately. Lossy frames that are not
586    /// accepted are dropped; reliable frames are retried on transient backpressure.
587    sent_frames: usize,
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
591enum PushSendOutcome {
592    Sent,
593    Backpressure,
594    PermanentFailure,
595}
596
597fn try_send_push_body(
598    writer_tx: &mpsc::Sender<Frame>,
599    channel: RouteChannel,
600    body: &[u8],
601) -> PushSendOutcome {
602    let Ok(route_channel) = u16::try_from(channel) else {
603        log::warn!("subc attach: invalid route channel {channel} for Push fan-out");
604        return PushSendOutcome::PermanentFailure;
605    };
606    let push_frame = match Frame::build_with_version(
607        PROTOCOL_VERSION,
608        FrameType::Push,
609        control_flags(),
610        route_channel,
611        0,
612        body.to_vec(),
613    ) {
614        Ok(frame) => frame,
615        Err(error) => {
616            log::warn!("subc attach: failed to build Push frame: {error}");
617            return PushSendOutcome::PermanentFailure;
618        }
619    };
620    match writer_tx.try_send(push_frame) {
621        Ok(()) => PushSendOutcome::Sent,
622        Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
623        Err(mpsc::error::TrySendError::Closed(_)) => {
624            log::warn!("subc attach: writer closed while sending Push frame");
625            PushSendOutcome::PermanentFailure
626        }
627    }
628}
629
630fn try_send_push_frame(
631    writer_tx: &mpsc::Sender<Frame>,
632    channel: RouteChannel,
633    frame: &PushFrame,
634) -> PushSendOutcome {
635    let body = match serde_json::to_vec(frame) {
636        Ok(body) => body,
637        Err(error) => {
638            log::warn!("subc attach: failed to serialize PushFrame: {error}");
639            return PushSendOutcome::PermanentFailure;
640        }
641    };
642    try_send_push_body(writer_tx, channel, &body)
643}
644
645fn try_send_bg_stream_frame(
646    writer_tx: &mpsc::Sender<Frame>,
647    channel: RouteChannel,
648    sub: &BgSub,
649    ty: FrameType,
650    body: Vec<u8>,
651) -> PushSendOutcome {
652    let Ok(route_channel) = u16::try_from(channel) else {
653        log::warn!("subc attach: invalid route channel {channel} for bg_events stream");
654        return PushSendOutcome::PermanentFailure;
655    };
656    let frame =
657        match Frame::build_with_version(sub.ver, ty, sub.flags, route_channel, sub.corr, body) {
658            Ok(frame) => frame,
659            Err(error) => {
660                log::warn!("subc attach: failed to build bg_events stream frame: {error}");
661                return PushSendOutcome::PermanentFailure;
662            }
663        };
664    match writer_tx.try_send(frame) {
665        Ok(()) => PushSendOutcome::Sent,
666        Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
667        Err(mpsc::error::TrySendError::Closed(_)) => {
668            log::warn!("subc attach: writer closed while sending bg_events stream frame");
669            PushSendOutcome::PermanentFailure
670        }
671    }
672}
673
674fn try_send_bg_stream_data(
675    writer_tx: &mpsc::Sender<Frame>,
676    channel: RouteChannel,
677    sub: &BgSub,
678) -> PushSendOutcome {
679    let body = match serde_json::to_vec(&json!({ "op": "bg_events" })) {
680        Ok(body) => body,
681        Err(error) => {
682            log::warn!("subc attach: failed to serialize bg_events stream payload: {error}");
683            return PushSendOutcome::PermanentFailure;
684        }
685    };
686    try_send_bg_stream_frame(writer_tx, channel, sub, FrameType::StreamData, body)
687}
688
689fn try_send_bg_stream_end(
690    writer_tx: &mpsc::Sender<Frame>,
691    channel: RouteChannel,
692    sub: &BgSub,
693) -> PushSendOutcome {
694    try_send_bg_stream_frame(writer_tx, channel, sub, FrameType::StreamEnd, Vec::new())
695}
696
697fn emit_bg_event_wakes(
698    writer_tx: &mpsc::Sender<Frame>,
699    bg_subs: &HashMap<RouteChannel, BgSub>,
700    bg_wake_pending: &mut HashSet<RouteChannel>,
701) {
702    let pending_channels: Vec<RouteChannel> = bg_wake_pending.iter().copied().collect();
703    let mut stale_channels = Vec::new();
704    for channel in pending_channels {
705        if let Some(sub) = bg_subs.get(&channel) {
706            let _ = try_send_bg_stream_data(writer_tx, channel, sub);
707        } else {
708            stale_channels.push(channel);
709        }
710    }
711    for channel in stale_channels {
712        bg_wake_pending.remove(&channel);
713    }
714}
715
716/// Always bump the epoch for (root, session) when arming a wake on `channel`,
717/// even if the channel was already present in the pending set. This ensures
718/// that later maintenance logic holding an older epoch value cannot suppress a
719/// wake that was armed after the maintenance snapshot was taken.
720fn arm_bg_wake(
721    root: ProjectRootId,
722    session: String,
723    channel: RouteChannel,
724    bg_wake_pending: &mut HashSet<RouteChannel>,
725    bg_wake_epoch: &mut HashMap<(ProjectRootId, String), u64>,
726) {
727    *bg_wake_epoch.entry((root, session)).or_default() += 1;
728    bg_wake_pending.insert(channel);
729}
730
731fn clear_stale_bg_wakes_for_empty_sessions(
732    root_id: &ProjectRootId,
733    empty_bg_sessions: &[(String, u64)],
734    bg_sub_by_session: &HashMap<(ProjectRootId, String), RouteChannel>,
735    bg_wake_pending: &mut HashSet<RouteChannel>,
736    bg_wake_epoch: &HashMap<(ProjectRootId, String), u64>,
737) {
738    for (session, epoch_at_submit) in empty_bg_sessions {
739        let key = (root_id.clone(), session.clone());
740        if bg_wake_epoch.get(&key).copied() == Some(*epoch_at_submit) {
741            if let Some(channel) = bg_sub_by_session.get(&key).copied() {
742                bg_wake_pending.remove(&channel);
743            }
744        }
745    }
746}
747
748fn bounded_push_back<T>(queue: &mut VecDeque<T>, item: T) {
749    if queue.len() >= PUSH_BUFFER_MAX_PER_KEY {
750        queue.pop_front();
751    }
752    queue.push_back(item);
753}
754
755fn buffer_push_frame(
756    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
757    key: ReplayKey,
758    frame: PushFrame,
759) {
760    bounded_push_back(push_buffer.entry(key).or_default(), frame);
761}
762
763fn buffer_retry_frame(
764    retry_buffer: &mut RetryBuffer,
765    channel: RouteChannel,
766    key: ReplayKey,
767    frame: PushFrame,
768) {
769    bounded_push_back(retry_buffer.entry(channel).or_default(), (key, frame));
770}
771
772fn migrate_retry_buffer_to_push_buffer(
773    retry_buffer: &mut RetryBuffer,
774    channel: RouteChannel,
775    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
776) -> usize {
777    let Some(frames) = retry_buffer.remove(&channel) else {
778        return 0;
779    };
780    let migrated = frames.len();
781    for (key, frame) in frames {
782        buffer_push_frame(push_buffer, key, frame);
783    }
784    migrated
785}
786
787fn replay_buffered_push_frames(
788    writer_tx: &mpsc::Sender<Frame>,
789    channel: RouteChannel,
790    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
791    key: &ReplayKey,
792    trust: BindTrust,
793) -> usize {
794    let mut sent = 0;
795    let remove_empty;
796
797    {
798        let Some(queue) = push_buffer.get_mut(key) else {
799            return 0;
800        };
801
802        while let Some(frame) = queue.pop_front() {
803            if frame_is_bash_observation(&frame) && !trust.allows_bash_observation() {
804                continue;
805            }
806            match try_send_push_frame(writer_tx, channel, &frame) {
807                PushSendOutcome::Sent => sent += 1,
808                PushSendOutcome::Backpressure => {
809                    queue.push_front(frame);
810                    break;
811                }
812                PushSendOutcome::PermanentFailure => {
813                    log::warn!(
814                        "subc attach: dropping buffered reliable Push for root {} harness {} session {} after permanent send failure",
815                        key.root.as_path().display(),
816                        key.harness,
817                        key.session
818                    );
819                }
820            }
821        }
822
823        remove_empty = queue.is_empty();
824    }
825
826    if remove_empty {
827        push_buffer.remove(key);
828    }
829
830    sent
831}
832
833fn drain_retry_buffer_for_channel(
834    writer_tx: &mpsc::Sender<Frame>,
835    channel: RouteChannel,
836    retry_buffer: &mut RetryBuffer,
837) -> usize {
838    let mut sent = 0;
839    let remove_empty;
840
841    {
842        let Some(queue) = retry_buffer.get_mut(&channel) else {
843            return 0;
844        };
845
846        while let Some((key, frame)) = queue.pop_front() {
847            match try_send_push_frame(writer_tx, channel, &frame) {
848                PushSendOutcome::Sent => sent += 1,
849                PushSendOutcome::Backpressure => {
850                    queue.push_front((key, frame));
851                    break;
852                }
853                PushSendOutcome::PermanentFailure => {
854                    log::warn!(
855                        "subc attach: dropping retry-buffered reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
856                        key.root.as_path().display(),
857                        key.harness,
858                        key.session
859                    );
860                }
861            }
862        }
863
864        remove_empty = queue.is_empty();
865    }
866
867    if remove_empty {
868        retry_buffer.remove(&channel);
869    }
870
871    sent
872}
873
874fn drain_retry_buffers_for_bound_routes(
875    writer_tx: &mpsc::Sender<Frame>,
876    routes: &HashMap<RouteChannel, RouteIdentity>,
877    retry_buffer: &mut RetryBuffer,
878) -> usize {
879    let channels: Vec<RouteChannel> = routes.keys().copied().collect();
880    channels
881        .into_iter()
882        .map(|channel| drain_retry_buffer_for_channel(writer_tx, channel, retry_buffer))
883        .sum()
884}
885
886fn matching_route_channels(
887    routes: &HashMap<RouteChannel, RouteIdentity>,
888    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
889    root: &ProjectRootId,
890    frame: &PushFrame,
891) -> Vec<RouteChannel> {
892    let Some(channels) = root_channels.get(root) else {
893        return Vec::new();
894    };
895
896    let session = frame_session(frame);
897    let bash_observation = frame_is_bash_observation(frame);
898    channels
899        .iter()
900        .copied()
901        .filter(|channel| {
902            let Some(identity) = routes.get(channel) else {
903                return !bash_observation && session.is_none();
904            };
905            if bash_observation && !identity.trust.allows_bash_observation() {
906                return false;
907            }
908            match session {
909                Some(session) => identity.session == session,
910                None => true,
911            }
912        })
913        .collect()
914}
915
916fn buffer_detached_reliable_push_frame(
917    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
918    session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
919    root: &ProjectRootId,
920    frame: &PushFrame,
921) {
922    let Some(session) = frame_session(frame) else {
923        log::warn!(
924            "subc attach: dropping reliable project-scoped Push for root {} because no route is bound",
925            root.as_path().display()
926        );
927        return;
928    };
929
930    if let Some((key, trust)) = replay_key_for_session(session_identity, root, session) {
931        if frame_is_bash_observation(frame) && !trust.allows_bash_observation() {
932            return;
933        }
934        buffer_push_frame(push_buffer, key, frame.clone());
935    } else {
936        log::warn!(
937            "subc attach: dropping reliable Push for root {} session {} because no retained harness identity is known",
938            root.as_path().display(),
939            session
940        );
941    }
942}
943
944fn fan_out_lossy_push_frame(
945    writer_tx: &mpsc::Sender<Frame>,
946    routes: &HashMap<RouteChannel, RouteIdentity>,
947    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
948    root: &ProjectRootId,
949    frame: &PushFrame,
950) -> FanOutResult {
951    let matching_channels = matching_route_channels(routes, root_channels, root, frame);
952    let matched_channels = matching_channels.len();
953    if matched_channels == 0 {
954        return FanOutResult::default();
955    }
956
957    let body = match serde_json::to_vec(frame) {
958        Ok(body) => body,
959        Err(error) => {
960            log::warn!("subc attach: failed to serialize PushFrame for fan-out: {error}");
961            return FanOutResult {
962                matched_channels,
963                sent_frames: 0,
964            };
965        }
966    };
967
968    let sent_frames = matching_channels
969        .into_iter()
970        .filter(|&channel| {
971            matches!(
972                try_send_push_body(writer_tx, channel, &body),
973                PushSendOutcome::Sent
974            )
975        })
976        .count();
977
978    FanOutResult {
979        matched_channels,
980        sent_frames,
981    }
982}
983
984fn fan_out_reliable_push_frame(
985    writer_tx: &mpsc::Sender<Frame>,
986    routes: &HashMap<RouteChannel, RouteIdentity>,
987    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
988    session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
989    retry_buffer: &mut RetryBuffer,
990    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
991    root: &ProjectRootId,
992    frame: &PushFrame,
993) -> FanOutResult {
994    let matching_channels = matching_route_channels(routes, root_channels, root, frame);
995    let matched_channels = matching_channels.len();
996    if matched_channels == 0 {
997        buffer_detached_reliable_push_frame(push_buffer, session_identity, root, frame);
998        return FanOutResult::default();
999    }
1000
1001    let mut sent_frames = 0;
1002    for channel in matching_channels {
1003        let Some(identity) = routes.get(&channel) else {
1004            log::warn!(
1005                "subc attach: dropping reliable Push for stale route channel {channel} with no route identity"
1006            );
1007            continue;
1008        };
1009        let key = ReplayKey::from_identity(identity);
1010
1011        if retry_buffer
1012            .get(&channel)
1013            .is_some_and(|queue| !queue.is_empty())
1014        {
1015            buffer_retry_frame(retry_buffer, channel, key, frame.clone());
1016            continue;
1017        }
1018
1019        match try_send_push_frame(writer_tx, channel, frame) {
1020            PushSendOutcome::Sent => sent_frames += 1,
1021            PushSendOutcome::Backpressure => {
1022                buffer_retry_frame(retry_buffer, channel, key, frame.clone());
1023            }
1024            PushSendOutcome::PermanentFailure => {
1025                log::warn!(
1026                    "subc attach: dropping reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
1027                    key.root.as_path().display(),
1028                    key.harness,
1029                    key.session
1030                );
1031            }
1032        }
1033    }
1034
1035    FanOutResult {
1036        matched_channels,
1037        sent_frames,
1038    }
1039}
1040
1041fn process_reliable_push_frame(
1042    writer_tx: &mpsc::Sender<Frame>,
1043    routes: &HashMap<RouteChannel, RouteIdentity>,
1044    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
1045    session_identity: &HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
1046    retry_buffer: &mut RetryBuffer,
1047    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1048    completed_tasks: &mut CompletedTaskIds,
1049    root: ProjectRootId,
1050    frame: PushFrame,
1051) -> Option<(ProjectRootId, String)> {
1052    let completed_bg_session = completed_bg_session_key(&root, &frame);
1053    if let Some(task_id) = completed_task_id(&frame) {
1054        completed_tasks.remember(task_id);
1055    }
1056    let _ = fan_out_reliable_push_frame(
1057        writer_tx,
1058        routes,
1059        root_channels,
1060        session_identity,
1061        retry_buffer,
1062        push_buffer,
1063        &root,
1064        &frame,
1065    );
1066    completed_bg_session
1067}
1068
1069fn process_lossy_push_frame(
1070    writer_tx: &mpsc::Sender<Frame>,
1071    routes: &HashMap<RouteChannel, RouteIdentity>,
1072    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
1073    completed_tasks: &CompletedTaskIds,
1074    root: ProjectRootId,
1075    frame: PushFrame,
1076) {
1077    if should_drop_lossy_push(completed_tasks, &frame) {
1078        if let Some(task_id) = long_running_task_id(&frame) {
1079            log::debug!(
1080                "subc attach: dropping stale BashLongRunning Push for completed task {task_id}"
1081            );
1082        }
1083        return;
1084    }
1085
1086    let _ = fan_out_lossy_push_frame(writer_tx, routes, root_channels, &root, &frame);
1087}
1088
1089/// Sync command dispatch, passed in from `main` (the binary owns the command
1090/// table). Invoked only inside executor jobs in subc mode.
1091pub type DispatchFn = fn(RawRequest, &AppContext) -> Response;
1092
1093/// Entry point for `aft --subc <connection-file>`. Synchronous on the outside;
1094/// owns an isolated current-thread tokio runtime for the async transport.
1095/// Returns `Err` (fail-loud) on any connect/auth/protocol failure — we never
1096/// fall back to the standalone loop, to avoid split-brain index state.
1097pub fn run_subc_mode(
1098    connection_file_path: &Path,
1099    ctx: Arc<AppContext>,
1100    executor: Arc<Executor>,
1101    dispatch: DispatchFn,
1102    user_config_path: Option<PathBuf>,
1103) -> Result<(), SubcError> {
1104    // Production NEVER allows non-manifest tool names on route channels: AFT
1105    // fails closed and does not trust subc to enforce the manifest. The
1106    // test-only harness sets this through `run_subc_mode_for_test`.
1107    run_subc_mode_inner(
1108        connection_file_path,
1109        ctx,
1110        executor,
1111        dispatch,
1112        user_config_path,
1113        false,
1114    )
1115}
1116
1117fn run_subc_mode_inner(
1118    connection_file_path: &Path,
1119    ctx: Arc<AppContext>,
1120    executor: Arc<Executor>,
1121    dispatch: DispatchFn,
1122    user_config_path: Option<PathBuf>,
1123    allow_native_passthrough: bool,
1124) -> Result<(), SubcError> {
1125    let runtime = tokio::runtime::Builder::new_current_thread()
1126        .enable_all()
1127        .build()
1128        .map_err(SubcError::Runtime)?;
1129
1130    let executor_for_loop = Arc::clone(&executor);
1131    let loop_result = runtime.block_on(async move {
1132        let shared_app = ctx.app();
1133        drop(ctx);
1134        let stream = connect_and_authenticate(connection_file_path).await?;
1135        log::info!(
1136            "subc attach: authenticated to daemon via {}",
1137            connection_file_path.display()
1138        );
1139        let (read_half, write_half) = tokio::io::split(stream);
1140        run_module_loop(
1141            read_half,
1142            write_half,
1143            shared_app,
1144            executor_for_loop,
1145            dispatch,
1146            user_config_path,
1147            allow_native_passthrough,
1148        )
1149        .await
1150    });
1151
1152    for actor_ctx in executor.actor_contexts() {
1153        actor_ctx.lsp().shutdown_all();
1154        actor_ctx.bash_background().detach();
1155    }
1156
1157    loop_result
1158}
1159
1160/// Test-only entry that enables the non-manifest native-command passthrough on
1161/// route channels. Integration tests drive synthetic native commands (`glob`,
1162/// `callers`, `subc_test_echo_session`, …) through the executor to exercise
1163/// mechanics; production callers use [`run_subc_mode`], which fails closed.
1164#[doc(hidden)]
1165pub fn run_subc_mode_for_test(
1166    connection_file_path: &Path,
1167    ctx: Arc<AppContext>,
1168    executor: Arc<Executor>,
1169    dispatch: DispatchFn,
1170    user_config_path: Option<PathBuf>,
1171) -> Result<(), SubcError> {
1172    run_subc_mode_inner(
1173        connection_file_path,
1174        ctx,
1175        executor,
1176        dispatch,
1177        user_config_path,
1178        true,
1179    )
1180}
1181
1182/// Read the connection file → resolve the first endpoint → TCP connect → HMAC
1183/// handshake. Mirrors the reference `fake-aft-stub::connect_to_subc`.
1184async fn connect_and_authenticate(connection_file_path: &Path) -> Result<TcpStream, SubcError> {
1185    let conn = connection_file::read(connection_file_path).map_err(|source| {
1186        SubcError::ConnectionFile {
1187            path: connection_file_path.to_path_buf(),
1188            source,
1189        }
1190    })?;
1191
1192    let endpoint = conn
1193        .endpoints
1194        .first()
1195        .ok_or_else(|| SubcError::NoEndpoint {
1196            path: connection_file_path.to_path_buf(),
1197        })?;
1198    let endpoint_label = format!("{}:{}", endpoint.host, endpoint.port);
1199    let ip = endpoint
1200        .host
1201        .parse::<IpAddr>()
1202        .map_err(|_| SubcError::InvalidEndpoint {
1203            path: connection_file_path.to_path_buf(),
1204            endpoint: endpoint_label.clone(),
1205        })?;
1206    let addr = SocketAddr::new(ip, endpoint.port);
1207
1208    let mut stream = TcpStream::connect(addr)
1209        .await
1210        .map_err(|source| SubcError::Connect {
1211            endpoint: endpoint_label.clone(),
1212            source,
1213        })?;
1214
1215    authenticate_client(&mut stream, &conn, AUTH_DEADLINE)
1216        .await
1217        .map_err(|source| SubcError::Auth {
1218            endpoint: endpoint_label,
1219            source,
1220        })?;
1221
1222    Ok(stream)
1223}
1224
1225/// ModuleHello → HelloAck → control/route loop. Runs until the daemon closes
1226/// the connection (EOF), sends channel-0 Goodbye, or a fatal mutating executor
1227/// response requests whole-connection teardown.
1228async fn run_module_loop<R, W>(
1229    mut read: R,
1230    mut write: W,
1231    shared_app: Arc<App>,
1232    executor: Arc<Executor>,
1233    dispatch: DispatchFn,
1234    user_config_path: Option<PathBuf>,
1235    allow_native_passthrough: bool,
1236) -> Result<(), SubcError>
1237where
1238    R: AsyncRead + Unpin + Send + 'static,
1239    W: AsyncWrite + Unpin + Send + 'static,
1240{
1241    // ModuleHello: register as a tool provider. control_ops:None = full baseline.
1242    // Echo the one-time launch nonce the daemon injected via SUBC_LAUNCH_NONCE so a
1243    // reserved module_id's HELLO is accepted; absent for non-reserved/self-connect.
1244    let hello = ModuleHelloBody {
1245        manifest: build_manifest(),
1246        protocol_ver: PROTOCOL_VERSION,
1247        control_ops: None,
1248        launch_nonce: std::env::var("SUBC_LAUNCH_NONCE").ok(),
1249    };
1250    let hello_frame = Frame::build(
1251        FrameType::Hello,
1252        control_flags(),
1253        0,
1254        HELLO_CORR,
1255        serde_json::to_vec(&hello).map_err(SubcError::Json)?,
1256    )
1257    .map_err(SubcError::FrameBuild)?;
1258    write_frame(&mut write, &hello_frame)
1259        .await
1260        .map_err(SubcError::FrameIo)?;
1261
1262    // Expect HelloAck (registered) or a channel-0 Error (manifest/version reject).
1263    match read_frame(&mut read).await.map_err(SubcError::FrameIo)? {
1264        None => return Err(SubcError::ClosedBeforeHelloAck),
1265        Some(frame) => match frame.header.ty {
1266            FrameType::HelloAck => {
1267                log::info!("subc attach: registered (HelloAck received)");
1268            }
1269            FrameType::Error => {
1270                let body = serde_json::from_slice::<ErrorBody>(&frame.body).ok();
1271                return Err(SubcError::HelloRejected { body });
1272            }
1273            other => return Err(SubcError::UnexpectedFrame { ty: other }),
1274        },
1275    }
1276
1277    let (writer_tx, writer_rx) = mpsc::channel::<Frame>(256);
1278    let writer_task = spawn_writer_task(write, writer_rx);
1279    // `read_frame` is NOT cancellation-safe, so it must never sit directly inside
1280    // the `select!` below: a drain-interval tick (or shutdown) firing while a
1281    // frame is mid-transit would drop the partially-consumed bytes and desync the
1282    // stream (the next read would parse a body byte as a frame header). A
1283    // dedicated reader task owns the socket, reads whole frames sequentially, and
1284    // forwards them over a channel; the loop selects on the cancel-safe `recv()`.
1285    let (reader_tx, mut reader_rx) = mpsc::channel::<Result<Frame, SubcError>>(256);
1286    let reader_task = spawn_reader_task(read, reader_tx);
1287    let shutdown = Arc::new(Notify::new());
1288    let mut drain_interval = tokio::time::interval(Duration::from_millis(250));
1289    let (maintenance_tx, mut maintenance_rx) = mpsc::channel::<MaintenanceCompletion>(256);
1290    let (bash_deferred_tx, mut bash_deferred_rx) = mpsc::channel::<BashDeferredCompletion>(256);
1291    let (bash_poll_touch_tx, mut bash_poll_touch_rx) = mpsc::channel::<ProjectRootId>(256);
1292    let (control_completion_tx, mut control_completion_rx) =
1293        mpsc::channel::<RouteBindCompletion>(256);
1294    let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1024);
1295    let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
1296    let push_senders = PushSenders {
1297        lossy_tx,
1298        reliable_tx,
1299    };
1300    let connection_cancel = PersistentCancelSignal::new();
1301    let mut routes: HashMap<RouteChannel, RouteIdentity> = HashMap::new();
1302    let mut bg_subs: HashMap<RouteChannel, BgSub> = HashMap::new();
1303    let mut bg_sub_by_session: HashMap<(ProjectRootId, String), RouteChannel> = HashMap::new();
1304    let mut bg_wake_pending: HashSet<RouteChannel> = HashSet::new();
1305    let mut bg_wake_epoch: HashMap<(ProjectRootId, String), u64> = HashMap::new();
1306    let mut root_channels: HashMap<ProjectRootId, HashSet<RouteChannel>> = HashMap::new();
1307    let mut session_identity: HashMap<(ProjectRootId, String), RetainedSessionIdentity> =
1308        HashMap::new();
1309    let mut push_buffer: HashMap<ReplayKey, VecDeque<PushFrame>> = HashMap::new();
1310    let mut retry_buffer: RetryBuffer = HashMap::new();
1311    let mut completed_tasks = CompletedTaskIds::default();
1312    let mut live_roots: HashMap<ProjectRootId, RootMeta> = HashMap::new();
1313    let mut pending_binds: HashMap<RouteChannel, PendingBind> = HashMap::new();
1314    let mut route_bash_cancels: HashMap<RouteChannel, RouteBashCancel> = HashMap::new();
1315
1316    let loop_result: Result<(), SubcError> = loop {
1317        tokio::select! {
1318            _ = shutdown.notified() => {
1319                log::warn!("subc attach: fatal executor response requested teardown");
1320                break Ok(());
1321            }
1322            maybe_frame = reader_rx.recv() => {
1323                let frame = match maybe_frame {
1324                    None => {
1325                        log::info!("subc attach: daemon closed connection");
1326                        break Ok(());
1327                    }
1328                    Some(Err(error)) => break Err(error),
1329                    Some(Ok(frame)) => frame,
1330                };
1331
1332                match frame.header.ty {
1333                    FrameType::Ping if frame.header.channel == 0 => {
1334                        let pong = match Frame::build_with_version(
1335                            frame.header.ver,
1336                            FrameType::Pong,
1337                            frame.header.flags,
1338                            0,
1339                            frame.header.corr,
1340                            Vec::new(),
1341                        ) {
1342                            Ok(pong) => pong,
1343                            Err(error) => break Err(SubcError::FrameBuild(error)),
1344                        };
1345                        if let Err(error) = send_frame(&writer_tx, pong).await {
1346                            break Err(error);
1347                        }
1348                    }
1349                    FrameType::Goodbye if frame.header.channel == 0 => {
1350                        log::info!("subc attach: received channel-0 Goodbye");
1351                        break Ok(());
1352                    }
1353                    FrameType::Goodbye => {
1354                        let channel = route_key(frame.header.channel);
1355                        end_bg_subscription(
1356                            &writer_tx,
1357                            &mut bg_subs,
1358                            &mut bg_sub_by_session,
1359                            &mut bg_wake_pending,
1360                            channel,
1361                            routes.get(&channel),
1362                        );
1363                        if let Some(cancel) = route_bash_cancels.remove(&channel) {
1364                            cancel.token.cancel();
1365                        }
1366                        if let Some(pending) = pending_binds.get_mut(&channel) {
1367                            pending.cancelled = true;
1368                            log::debug!(
1369                                "subc attach: cancelled pending RouteBind for route {} on Goodbye",
1370                                frame.header.channel
1371                            );
1372                        }
1373                        let migrated = migrate_retry_buffer_to_push_buffer(
1374                            &mut retry_buffer,
1375                            channel,
1376                            &mut push_buffer,
1377                        );
1378                        if let Some(identity) = remove_route_channel(&mut routes, &mut root_channels, channel) {
1379                            if migrated > 0 {
1380                                log::debug!(
1381                                    "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from route {} into detach replay",
1382                                    frame.header.channel
1383                                );
1384                            }
1385                            if let Some(meta) = live_roots.get_mut(&identity.root) {
1386                                let idle_for = meta.last_touched.elapsed();
1387                                meta.touch();
1388                                log::debug!(
1389                                    "subc attach: route {} torn down for root {} harness {} session {} (last touched {:?} ago)",
1390                                    frame.header.channel,
1391                                    identity.root.as_path().display(),
1392                                    identity.harness,
1393                                    identity.session,
1394                                    idle_for
1395                                );
1396                            } else {
1397                                log::debug!(
1398                                    "subc attach: route {} torn down for root {} harness {} session {}",
1399                                    frame.header.channel,
1400                                    identity.root.as_path().display(),
1401                                    identity.harness,
1402                                    identity.session
1403                                );
1404                            }
1405                        } else {
1406                            if migrated > 0 {
1407                                log::debug!(
1408                                    "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from unbound route {} into detach replay",
1409                                    frame.header.channel
1410                                );
1411                            }
1412                            log::debug!("subc attach: unbound route {} torn down", frame.header.channel);
1413                        }
1414                    }
1415                    FrameType::Request if frame.header.channel == 0 => {
1416                        if let Err(error) = handle_control_request(
1417                            &writer_tx,
1418                            &frame,
1419                            &shared_app,
1420                            &executor,
1421                            &mut live_roots,
1422                            &mut pending_binds,
1423                            &control_completion_tx,
1424                            &push_senders,
1425                            dispatch,
1426                            user_config_path.as_deref(),
1427                        )
1428                        .await
1429                        {
1430                            break Err(error);
1431                        }
1432                    }
1433                    FrameType::Request => {
1434                        if let Err(error) = handle_tool_call(
1435                            &writer_tx,
1436                            &frame,
1437                            &routes,
1438                            &pending_binds,
1439                            &mut live_roots,
1440                            &executor,
1441                            &shutdown,
1442                            &connection_cancel,
1443                            &bash_deferred_tx,
1444                            &bash_poll_touch_tx,
1445                            &mut route_bash_cancels,
1446                            &mut bg_subs,
1447                            &mut bg_sub_by_session,
1448                            &mut bg_wake_pending,
1449                            &mut bg_wake_epoch,
1450                            dispatch,
1451                            allow_native_passthrough,
1452                        )
1453                        .await
1454                        {
1455                            break Err(error);
1456                        }
1457                    }
1458                    FrameType::Cancel => {
1459                        let channel = route_key(frame.header.channel);
1460                        if bg_subs.contains_key(&channel) {
1461                            end_bg_subscription(
1462                                &writer_tx,
1463                                &mut bg_subs,
1464                                &mut bg_sub_by_session,
1465                                &mut bg_wake_pending,
1466                                channel,
1467                                routes.get(&channel),
1468                            );
1469                        }
1470                    }
1471                    // Push/etc. are not handled on ingress. In-flight tool-call
1472                    // cancellation is not implemented, so non-bg_events Cancels
1473                    // and unrelated frame types are ignored rather than acted on.
1474                    _ => {}
1475                }
1476            }
1477            Some((root_id, frame)) = reliable_rx.recv() => {
1478                // Drain reliable frames in FIFO order. They are intentionally not
1479                // coalesced: completion, pattern-match, and warning frames are
1480                // must-deliver events.
1481                let mut batch = vec![(root_id, frame)];
1482                while let Ok(item) = reliable_rx.try_recv() {
1483                    batch.push(item);
1484                }
1485
1486                for (root, frame) in batch {
1487                    if let Some((root, session)) = process_reliable_push_frame(
1488                        &writer_tx,
1489                        &routes,
1490                        &root_channels,
1491                        &session_identity,
1492                        &mut retry_buffer,
1493                        &mut push_buffer,
1494                        &mut completed_tasks,
1495                        root,
1496                        frame,
1497                    ) {
1498                        if let Some(channel) = bg_sub_by_session
1499                            .get(&(root.clone(), session.clone()))
1500                            .copied()
1501                        {
1502                            arm_bg_wake(
1503                                root,
1504                                session,
1505                                channel,
1506                                &mut bg_wake_pending,
1507                                &mut bg_wake_epoch,
1508                            );
1509                        }
1510                    }
1511                }
1512            }
1513            Some((root_id, frame)) = lossy_rx.recv() => {
1514                // If both lanes are ready, process any already-queued reliable
1515                // completions first so a following stale BashLongRunning frame can
1516                // be suppressed even if select! happened to wake on the lossy lane.
1517                while let Ok((reliable_root, reliable_frame)) = reliable_rx.try_recv() {
1518                    if let Some((root, session)) = process_reliable_push_frame(
1519                        &writer_tx,
1520                        &routes,
1521                        &root_channels,
1522                        &session_identity,
1523                        &mut retry_buffer,
1524                        &mut push_buffer,
1525                        &mut completed_tasks,
1526                        reliable_root,
1527                        reliable_frame,
1528                    ) {
1529                        if let Some(channel) = bg_sub_by_session
1530                            .get(&(root.clone(), session.clone()))
1531                            .copied()
1532                        {
1533                            arm_bg_wake(
1534                                root,
1535                                session,
1536                                channel,
1537                                &mut bg_wake_pending,
1538                                &mut bg_wake_epoch,
1539                            );
1540                        }
1541                    }
1542                }
1543
1544                // Drain the currently queued burst in one loop turn so lossy
1545                // status/progress classes coalesce before reaching subc's shared
1546                // egress queue.
1547                let mut batch = vec![(root_id, frame)];
1548                while let Ok(item) = lossy_rx.try_recv() {
1549                    batch.push(item);
1550                }
1551
1552                for (root, frame) in coalesce_push_batch(batch) {
1553                    process_lossy_push_frame(
1554                        &writer_tx,
1555                        &routes,
1556                        &root_channels,
1557                        &completed_tasks,
1558                        root,
1559                        frame,
1560                    );
1561                }
1562            }
1563            Some(completion) = control_completion_rx.recv() => {
1564                if let Err(error) = handle_route_bind_completion(
1565                    &writer_tx,
1566                    completion,
1567                    &mut routes,
1568                    &mut root_channels,
1569                    &mut session_identity,
1570                    &mut push_buffer,
1571                    &mut live_roots,
1572                    &mut pending_binds,
1573                    &executor,
1574                    &shutdown,
1575                )
1576                .await
1577                {
1578                    break Err(error);
1579                }
1580            }
1581            Some(done) = bash_deferred_rx.recv() => {
1582                if let Err(error) = handle_bash_deferred_completion(
1583                    &writer_tx,
1584                    done,
1585                    &routes,
1586                    &mut live_roots,
1587                    &mut route_bash_cancels,
1588                    &shutdown,
1589                )
1590                .await
1591                {
1592                    break Err(error);
1593                }
1594            }
1595            Some(root_id) = bash_poll_touch_rx.recv() => {
1596                if let Some(meta) = live_roots.get_mut(&root_id) {
1597                    meta.touch();
1598                }
1599            }
1600            Some(completion) = maintenance_rx.recv() => {
1601                let root_id = completion.root_id;
1602                let response = completion.response;
1603                if let Some(meta) = live_roots.get_mut(&root_id) {
1604                    meta.maintenance_pending = false;
1605                }
1606                clear_stale_bg_wakes_for_empty_sessions(
1607                    &root_id,
1608                    &completion.empty_bg_sessions,
1609                    &bg_sub_by_session,
1610                    &mut bg_wake_pending,
1611                    &bg_wake_epoch,
1612                );
1613                if response_is_fatal_panic(&response) {
1614                    signal_fatal_teardown(&writer_tx, None, PROTOCOL_VERSION, 0, &shutdown).await;
1615                }
1616            }
1617            _ = drain_interval.tick() => {
1618                emit_bg_event_wakes(&writer_tx, &bg_subs, &mut bg_wake_pending);
1619
1620                let retried = drain_retry_buffers_for_bound_routes(
1621                    &writer_tx,
1622                    &routes,
1623                    &mut retry_buffer,
1624                );
1625                if retried > 0 {
1626                    log::debug!(
1627                        "subc attach: retried {retried} reliable Push frame(s) after writer backpressure"
1628                    );
1629                }
1630
1631                let due_roots: Vec<ProjectRootId> = live_roots
1632                    .iter_mut()
1633                    .filter_map(|(root_id, meta)| {
1634                        if meta.maintenance_pending {
1635                            None
1636                        } else {
1637                            meta.maintenance_pending = true;
1638                            Some(root_id.clone())
1639                        }
1640                    })
1641                    .collect();
1642                for root_id in due_roots {
1643                    let bg_sessions_to_check: Vec<(String, u64)> = bg_sub_by_session
1644                        .iter()
1645                        .filter_map(|((root, session), _)| {
1646                            if root == &root_id {
1647                                Some((
1648                                    session.clone(),
1649                                    bg_wake_epoch
1650                                        .get(&(root_id.clone(), session.clone()))
1651                                        .copied()
1652                                        .unwrap_or(0),
1653                                ))
1654                            } else {
1655                                None
1656                            }
1657                        })
1658                        .collect();
1659                    submit_maintenance_drain(
1660                        &executor,
1661                        root_id,
1662                        bg_sessions_to_check,
1663                        &maintenance_tx,
1664                    );
1665                }
1666            }
1667        }
1668    };
1669
1670    // The reader task may be parked on `read_frame`; abort it (we are done with
1671    // the connection) and flush the writer.
1672    connection_cancel.cancel();
1673    reader_task.abort();
1674    drop(writer_tx);
1675    let writer_result = finish_writer_task(writer_task).await;
1676    loop_result.and(writer_result)
1677}
1678
1679fn spawn_writer_task<W>(
1680    mut write: W,
1681    mut rx: mpsc::Receiver<Frame>,
1682) -> JoinHandle<Result<(), subc_transport::FrameIoError>>
1683where
1684    W: AsyncWrite + Unpin + Send + 'static,
1685{
1686    tokio::spawn(async move {
1687        while let Some(frame) = rx.recv().await {
1688            write_frame(&mut write, &frame).await?;
1689        }
1690        Ok(())
1691    })
1692}
1693
1694/// Owns the read half and reads whole frames sequentially. `read_frame` is not
1695/// cancellation-safe, so it must run here — never inside the main loop's
1696/// `select!` — to keep the inbound stream framed. Each frame (or the terminal
1697/// error / EOF) is forwarded over `tx`; the loop consumes them via cancel-safe
1698/// `recv()`. Exits on EOF (Ok(None)), a read error, or when `tx` is dropped
1699/// (the loop ended and aborted us).
1700fn spawn_reader_task<R>(mut read: R, tx: mpsc::Sender<Result<Frame, SubcError>>) -> JoinHandle<()>
1701where
1702    R: AsyncRead + Unpin + Send + 'static,
1703{
1704    tokio::spawn(async move {
1705        loop {
1706            match read_frame(&mut read).await {
1707                Ok(Some(frame)) => {
1708                    if tx.send(Ok(frame)).await.is_err() {
1709                        return;
1710                    }
1711                }
1712                Ok(None) => {
1713                    // EOF: let the loop observe channel close as "daemon closed".
1714                    return;
1715                }
1716                Err(error) => {
1717                    let _ = tx.send(Err(SubcError::FrameIo(error))).await;
1718                    return;
1719                }
1720            }
1721        }
1722    })
1723}
1724
1725async fn finish_writer_task(
1726    mut writer_task: JoinHandle<Result<(), subc_transport::FrameIoError>>,
1727) -> Result<(), SubcError> {
1728    match tokio::time::timeout(Duration::from_millis(100), &mut writer_task).await {
1729        Ok(Ok(Ok(()))) => Ok(()),
1730        Ok(Ok(Err(error))) => Err(SubcError::FrameIo(error)),
1731        Ok(Err(error)) => Err(SubcError::WriterJoin(error)),
1732        Err(_) => {
1733            writer_task.abort();
1734            Ok(())
1735        }
1736    }
1737}
1738
1739async fn send_frame(tx: &mpsc::Sender<Frame>, frame: Frame) -> Result<(), SubcError> {
1740    match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(frame)).await {
1741        Ok(Ok(())) => Ok(()),
1742        Ok(Err(_)) => Err(SubcError::WriterClosed),
1743        Err(_) => Err(SubcError::WriterBackpressureTimeout),
1744    }
1745}
1746
1747fn rollback_pending_bind_actor(
1748    executor: &Arc<Executor>,
1749    live_roots: &HashMap<ProjectRootId, RootMeta>,
1750    root_id: &ProjectRootId,
1751    inserted_new_actor: bool,
1752) {
1753    if inserted_new_actor && !live_roots.contains_key(root_id) {
1754        executor.remove_actor(root_id);
1755    }
1756}
1757
1758async fn handle_route_bind_completion(
1759    tx: &mpsc::Sender<Frame>,
1760    completion: RouteBindCompletion,
1761    routes: &mut HashMap<RouteChannel, RouteIdentity>,
1762    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
1763    session_identity: &mut HashMap<(ProjectRootId, String), RetainedSessionIdentity>,
1764    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1765    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1766    pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1767    executor: &Arc<Executor>,
1768    shutdown: &Arc<Notify>,
1769) -> Result<(), SubcError> {
1770    let route_id = route_key(completion.route_channel);
1771    let Some(pending) = pending_binds.remove(&route_id) else {
1772        log::warn!(
1773            "subc attach: dropping RouteBind completion for non-pending route {}",
1774            completion.route_channel
1775        );
1776        rollback_pending_bind_actor(
1777            executor,
1778            live_roots,
1779            &completion.bind_root_id,
1780            completion.inserted_new_actor,
1781        );
1782        return Ok(());
1783    };
1784
1785    if pending.bind_root_id != completion.bind_root_id {
1786        log::warn!(
1787            "subc attach: pending RouteBind root mismatch for route {} (pending {} completion {})",
1788            completion.route_channel,
1789            pending.bind_root_id.as_path().display(),
1790            completion.bind_root_id.as_path().display()
1791        );
1792    }
1793
1794    let inserted_new_actor = pending.inserted_new_actor || completion.inserted_new_actor;
1795    if pending.cancelled {
1796        rollback_pending_bind_actor(
1797            executor,
1798            live_roots,
1799            &completion.bind_root_id,
1800            inserted_new_actor,
1801        );
1802        log::debug!(
1803            "subc attach: discarded completed RouteBind for cancelled route {} root {}",
1804            completion.route_channel,
1805            completion.bind_root_id.as_path().display()
1806        );
1807        return Ok(());
1808    }
1809
1810    let failure = if !completion.configure_response.success {
1811        Some((
1812            &completion.configure_response,
1813            "configure failed during route bind",
1814        ))
1815    } else if let Some(drain_response) = completion.drain_response.as_ref() {
1816        if drain_response.success {
1817            None
1818        } else {
1819            Some((
1820                drain_response,
1821                "build-completion drain failed during route bind",
1822            ))
1823        }
1824    } else {
1825        None
1826    };
1827
1828    if let Some((response, fallback)) = failure {
1829        rollback_pending_bind_actor(
1830            executor,
1831            live_roots,
1832            &completion.bind_root_id,
1833            inserted_new_actor,
1834        );
1835        let message = response_message(response, fallback);
1836        let fatal = response_is_fatal_panic(response);
1837        send_route_bind_error_parts(
1838            tx,
1839            completion.ver,
1840            completion.corr,
1841            completion.flags,
1842            "config_divergence",
1843            &message,
1844        )
1845        .await?;
1846        if fatal {
1847            signal_fatal_teardown(
1848                tx,
1849                Some(completion.route_channel),
1850                completion.ver,
1851                completion.corr,
1852                shutdown,
1853            )
1854            .await;
1855        }
1856        return Ok(());
1857    }
1858
1859    remember_session_identity(session_identity, &completion.identity);
1860    let replay_key = ReplayKey::from_identity(&completion.identity);
1861    let bind_trust = completion.identity.trust;
1862    insert_route_channel(routes, root_channels, route_id, completion.identity);
1863    live_roots
1864        .entry(completion.bind_root_id.clone())
1865        .and_modify(|meta| {
1866            meta.touch();
1867            meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1868        })
1869        .or_insert_with(|| RootMeta::new(Instant::now()));
1870    if let Some(meta) = live_roots.get_mut(&completion.bind_root_id) {
1871        meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1872    }
1873
1874    let ack =
1875        serde_json::to_vec(&ModuleControlResponse::RouteBindAck {}).map_err(SubcError::Json)?;
1876    let response = Frame::build_with_version(
1877        completion.ver,
1878        FrameType::Response,
1879        control_flags(),
1880        0,
1881        completion.corr,
1882        ack,
1883    )
1884    .map_err(SubcError::FrameBuild)?;
1885    send_frame(tx, response).await?;
1886    let replayed = replay_buffered_push_frames(tx, route_id, push_buffer, &replay_key, bind_trust);
1887    if replayed > 0 {
1888        log::debug!(
1889            "subc attach: replayed {} buffered Push frame(s) to route {} root {} harness {} session {}",
1890            replayed,
1891            completion.route_channel,
1892            replay_key.root.as_path().display(),
1893            replay_key.harness,
1894            replay_key.session
1895        );
1896    }
1897    log::info!(
1898        "subc attach: route {} bound to root {}",
1899        completion.route_channel,
1900        completion.bind_root_id.as_path().display()
1901    );
1902    Ok(())
1903}
1904
1905/// channel-0 control request — currently only RouteBind. Reconciles the route's
1906/// RootConfig through the executor's Mutating lane and resolves completion on a
1907/// loop-owned control-completion channel so slow configure jobs do not block the
1908/// transport loop.
1909async fn handle_control_request(
1910    tx: &mpsc::Sender<Frame>,
1911    frame: &Frame,
1912    shared_app: &Arc<App>,
1913    executor: &Arc<Executor>,
1914    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1915    pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1916    control_completion_tx: &mpsc::Sender<RouteBindCompletion>,
1917    push_senders: &PushSenders,
1918    dispatch: DispatchFn,
1919    user_config_path: Option<&Path>,
1920) -> Result<(), SubcError> {
1921    let request =
1922        serde_json::from_slice::<ModuleControlRequest>(&frame.body).map_err(SubcError::Json)?;
1923    match request {
1924        ModuleControlRequest::RouteBind {
1925            route_channel,
1926            target: _,
1927            identity,
1928            principal,
1929        } => {
1930            let route_id = route_key(route_channel);
1931            if pending_binds.contains_key(&route_id) {
1932                return send_route_bind_error(
1933                    tx,
1934                    frame,
1935                    "config_divergence",
1936                    "route bind is already pending for channel",
1937                )
1938                .await;
1939            }
1940
1941            let bind_root_id = match ProjectRootId::from_path(&identity.project_root) {
1942                Ok(root_id) => root_id,
1943                Err(error) => {
1944                    return send_route_bind_error(
1945                        tx,
1946                        frame,
1947                        "config_divergence",
1948                        &format!("invalid route project root: {error}"),
1949                    )
1950                    .await;
1951                }
1952            };
1953
1954            // Reconcile RootConfig: build a configure request from the bind
1955            // identity + forwarded config tiers and run it through the executor.
1956            let request_id = format!("subc-bind-{route_channel}");
1957            let bind_project_root = identity.project_root.clone();
1958            let bind_harness = identity.harness.clone();
1959            let bind_session = identity.session.clone();
1960            let bind_trust = trust_for_principal(&principal);
1961            log::info!(
1962                "subc attach: route {} principal={} trust={}",
1963                route_channel,
1964                principal_label(&principal),
1965                bind_trust.label()
1966            );
1967
1968            // Config is single-per-project, read by AFT directly from the
1969            // CortexKit config files (user: ~/.config/cortexkit/aft.jsonc,
1970            // project: <root>/.cortexkit/aft.jsonc). Wire-relayed config tiers are
1971            // IGNORED entirely: a front (runner or mcp:*) cannot push config over
1972            // the wire. This is what makes config harness-INDEPENDENT — every
1973            // harness binding a project gets the identical on-disk config, so two
1974            // trust domains sharing the per-root actor can never diverge or
1975            // inherit each other's capabilities (the cross-bind escalation class).
1976            // Wire-relayed config tiers (if the protocol still carries them) are
1977            // ignored entirely; the per-tier trust boundary (user trusted, project
1978            // privileged-dropped) is applied to the FILE tiers in handle_configure.
1979            let local_tiers = crate::subc_config::read_local_cortexkit_config_tiers(
1980                user_config_path,
1981                Path::new(&bind_project_root),
1982            );
1983            let config_tiers: Vec<Value> = local_tiers
1984                .iter()
1985                .map(|t| json!({ "tier": t.tier, "source": t.source, "doc": t.doc }))
1986                .collect();
1987            let diagnostics_on_edit = diagnostics_on_edit_from_tiers(&local_tiers);
1988            let configure_json = json!({
1989                "id": request_id,
1990                "command": "configure",
1991                "project_root": bind_project_root,
1992                "harness": bind_harness,
1993                "session_id": bind_session.clone(),
1994                "config": config_tiers,
1995            });
1996            let configure_req = match serde_json::from_value::<RawRequest>(configure_json) {
1997                Ok(req) => req,
1998                Err(error) => {
1999                    return send_route_bind_error(
2000                        tx,
2001                        frame,
2002                        "config_divergence",
2003                        &format!("failed to build configure request: {error}"),
2004                    )
2005                    .await;
2006                }
2007            };
2008
2009            let route_identity = RouteIdentity {
2010                root: bind_root_id.clone(),
2011                project_root: PathBuf::from(&bind_project_root),
2012                harness: bind_harness.clone(),
2013                session: bind_session.clone(),
2014                trust: bind_trust,
2015            };
2016            let configure_session = route_identity.session.clone();
2017            let root_was_live = live_roots.contains_key(&bind_root_id);
2018            let inserted_new_actor = if root_was_live {
2019                log::debug!(
2020                    "subc attach: reusing actor for route {} root {}",
2021                    route_channel,
2022                    bind_root_id.as_path().display()
2023                );
2024                false
2025            } else {
2026                let actor_ctx = Arc::new(AppContext::from_app(
2027                    Arc::clone(shared_app),
2028                    Config::default(),
2029                ));
2030                install_bash_compressor(&actor_ctx);
2031                actor_ctx.set_progress_sender(Some(progress_sender_for_root(
2032                    push_senders.clone(),
2033                    bind_root_id.clone(),
2034                )));
2035                let inserted =
2036                    executor.register_actor(bind_root_id.clone(), Arc::clone(&actor_ctx));
2037                drop(actor_ctx);
2038                // Do not insert into live_roots until configure succeeds: live_roots
2039                // drives maintenance, and a half-configured new actor must not be
2040                // maintenance-eligible before its route/session identity exists.
2041                log::debug!(
2042                    "subc attach: registered actor for route {} root {}",
2043                    route_channel,
2044                    bind_root_id.as_path().display()
2045                );
2046                inserted
2047            };
2048
2049            pending_binds.insert(
2050                route_id,
2051                PendingBind {
2052                    bind_root_id: bind_root_id.clone(),
2053                    inserted_new_actor,
2054                    cancelled: false,
2055                },
2056            );
2057
2058            let configure_request_id = configure_req.id.clone();
2059            let configure_rx = executor.submit_async(
2060                bind_root_id.clone(),
2061                Lane::Mutating,
2062                configure_request_id.clone(),
2063                Box::new(move |ctx| {
2064                    log_ctx::with_session(Some(configure_session.clone()), || {
2065                        dispatch(configure_req, ctx)
2066                    })
2067                }),
2068            );
2069
2070            let completion_tx = control_completion_tx.clone();
2071            let completion_executor = Arc::clone(executor);
2072            let completion_identity = route_identity;
2073            let completion_root = bind_root_id.clone();
2074            let completion_route_channel = route_channel;
2075            let completion_ver = frame.header.ver;
2076            let completion_corr = frame.header.corr;
2077            let completion_flags = frame.header.flags;
2078            tokio::spawn(async move {
2079                let configure_response =
2080                    await_executor_response(configure_rx, configure_request_id.clone()).await;
2081                let drain_response = if configure_response.success && !root_was_live {
2082                    let drain_request_id = format!("subc-bind-drain-{completion_route_channel}");
2083                    let drain_response_id = drain_request_id.clone();
2084                    let drain_rx = completion_executor.submit_async(
2085                        completion_root.clone(),
2086                        Lane::Mutating,
2087                        drain_request_id.clone(),
2088                        Box::new(move |ctx| {
2089                            runtime_drain::drain_build_completions(ctx);
2090                            Response::success(drain_response_id, json!({ "drained": true }))
2091                        }),
2092                    );
2093                    Some(await_executor_response(drain_rx, drain_request_id).await)
2094                } else {
2095                    None
2096                };
2097
2098                let completion = RouteBindCompletion {
2099                    route_channel: completion_route_channel,
2100                    identity: completion_identity,
2101                    bind_root_id: completion_root,
2102                    inserted_new_actor,
2103                    configure_response,
2104                    drain_response,
2105                    diagnostics_on_edit,
2106                    ver: completion_ver,
2107                    corr: completion_corr,
2108                    flags: completion_flags,
2109                };
2110                if completion_tx.send(completion).await.is_err() {
2111                    log::debug!(
2112                        "subc attach: dropped RouteBind completion for route {} after loop exit",
2113                        completion_route_channel
2114                    );
2115                }
2116            });
2117
2118            Ok(())
2119        }
2120    }
2121}
2122
2123fn install_bash_compressor(ctx: &AppContext) {
2124    // Mirrors main.rs per-actor compressor installation for subc-created actors.
2125    let filter_registry_handle = ctx.shared_filter_registry();
2126    let compress_flag = ctx.bash_compress_flag();
2127    ctx.bash_background().set_compressor_with_exit_code(
2128        move |command: &str, output: String, exit_code: Option<i32>| {
2129            if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
2130                return crate::compress::CompressionResult::new(output);
2131            }
2132            let registry_guard = match filter_registry_handle.read() {
2133                Ok(g) => g,
2134                Err(poisoned) => poisoned.into_inner(),
2135            };
2136            crate::compress::compress_with_registry_exit_code(
2137                command,
2138                &output,
2139                exit_code,
2140                &registry_guard,
2141            )
2142        },
2143    );
2144}
2145
2146fn diagnostics_on_edit_from_tiers(tiers: &[ConfigTier]) -> bool {
2147    let mut diagnostics_on_edit = false;
2148    for tier in tiers {
2149        if let Some(value) = diagnostics_on_edit_from_doc(&tier.doc) {
2150            diagnostics_on_edit = value;
2151        }
2152    }
2153    diagnostics_on_edit
2154}
2155
2156fn diagnostics_on_edit_from_doc(doc: &str) -> Option<bool> {
2157    let stripped = strip_jsonc(doc);
2158    let value = serde_json::from_str::<Value>(&stripped).ok()?;
2159    value
2160        .get("lsp")
2161        .and_then(Value::as_object)?
2162        .get("diagnostics_on_edit")
2163        .and_then(Value::as_bool)
2164}
2165
2166async fn send_route_bind_error(
2167    tx: &mpsc::Sender<Frame>,
2168    frame: &Frame,
2169    code: &str,
2170    message: &str,
2171) -> Result<(), SubcError> {
2172    send_route_bind_error_parts(
2173        tx,
2174        frame.header.ver,
2175        frame.header.corr,
2176        frame.header.flags,
2177        code,
2178        message,
2179    )
2180    .await
2181}
2182
2183async fn send_route_bind_error_parts(
2184    tx: &mpsc::Sender<Frame>,
2185    ver: u8,
2186    corr: u64,
2187    flags: Flags,
2188    code: &str,
2189    message: &str,
2190) -> Result<(), SubcError> {
2191    let response = build_error_frame(ver, 0, corr, flags, code, message)?;
2192    send_frame(tx, response).await?;
2193    log::warn!("subc attach: route bind rejected ({code}): {message}");
2194    Ok(())
2195}
2196
2197/// Route-channel tool call: `{name, arguments}` → executor lane → dispatch to
2198/// the sync command core → wrap the structured Response in a CallToolResult
2199/// `{content, isError}`. v1 mapping: the whole `{success, ...}` Response
2200/// serialized into ONE text block; `isError` carries `success == false`.
2201async fn handle_tool_call(
2202    tx: &mpsc::Sender<Frame>,
2203    frame: &Frame,
2204    routes: &HashMap<RouteChannel, RouteIdentity>,
2205    pending_binds: &HashMap<RouteChannel, PendingBind>,
2206    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
2207    executor: &Arc<Executor>,
2208    shutdown: &Arc<Notify>,
2209    connection_cancel: &PersistentCancelSignal,
2210    bash_deferred_tx: &mpsc::Sender<BashDeferredCompletion>,
2211    bash_poll_touch_tx: &mpsc::Sender<ProjectRootId>,
2212    route_bash_cancels: &mut HashMap<RouteChannel, RouteBashCancel>,
2213    bg_subs: &mut HashMap<RouteChannel, BgSub>,
2214    bg_sub_by_session: &mut HashMap<(ProjectRootId, String), RouteChannel>,
2215    bg_wake_pending: &mut HashSet<RouteChannel>,
2216    bg_wake_epoch: &mut HashMap<(ProjectRootId, String), u64>,
2217    dispatch: DispatchFn,
2218    allow_native_passthrough: bool,
2219) -> Result<(), SubcError> {
2220    let route_id = route_key(frame.header.channel);
2221    if pending_binds.contains_key(&route_id) {
2222        let error = build_error_frame(
2223            frame.header.ver,
2224            frame.header.channel,
2225            frame.header.corr,
2226            frame.header.flags,
2227            "route_not_bound",
2228            "route is not bound before tool call",
2229        )?;
2230        return send_frame(tx, error).await;
2231    }
2232
2233    let Some(identity) = routes.get(&route_id).cloned() else {
2234        let error = build_error_frame(
2235            frame.header.ver,
2236            frame.header.channel,
2237            frame.header.corr,
2238            frame.header.flags,
2239            "route_not_bound",
2240            "route is not bound before tool call",
2241        )?;
2242        return send_frame(tx, error).await;
2243    };
2244    if let Some(meta) = live_roots.get_mut(&identity.root) {
2245        meta.touch();
2246    }
2247
2248    let is_bg_events_subscribe = serde_json::from_slice::<BgEventsProbe>(&frame.body)
2249        .ok()
2250        .and_then(|probe| probe.op)
2251        .as_deref()
2252        == Some("bg_events");
2253    if is_bg_events_subscribe {
2254        if let Some(old_sub) = bg_subs.get(&route_id).copied() {
2255            let _ = try_send_bg_stream_end(tx, route_id, &old_sub);
2256        }
2257        if !identity.trust.allows_bash_observation() {
2258            bg_subs.remove(&route_id);
2259            bg_wake_pending.remove(&route_id);
2260            remove_bg_subscription_index(bg_sub_by_session, route_id, Some(&identity));
2261            return Ok(());
2262        }
2263        bg_subs.insert(
2264            route_id,
2265            BgSub {
2266                corr: frame.header.corr,
2267                ver: frame.header.ver,
2268                flags: frame.header.flags,
2269            },
2270        );
2271        bg_sub_by_session.insert((identity.root.clone(), identity.session.clone()), route_id);
2272        arm_bg_wake(
2273            identity.root,
2274            identity.session,
2275            route_id,
2276            bg_wake_pending,
2277            bg_wake_epoch,
2278        );
2279        return Ok(());
2280    }
2281
2282    let call = serde_json::from_slice::<ToolCallRequest>(&frame.body).map_err(SubcError::Json)?;
2283    let bare_name = call.name.clone();
2284    let format_context = crate::subc_format::FormatContext::from_tool_call(
2285        &bare_name,
2286        &call.arguments,
2287        identity.project_root.as_path(),
2288    );
2289
2290    let request_id = format!("subc-{}-{}", frame.header.channel, frame.header.corr);
2291    let bind_trust = identity.trust;
2292    let diagnostics_on_edit = live_roots
2293        .get(&identity.root)
2294        .map(|meta| meta.diagnostics_on_edit)
2295        .unwrap_or(false);
2296
2297    if matches!(bind_trust, BindTrust::Untrusted) && is_bash_family_tool(&bare_name) {
2298        let response = bash_denied_untrusted_response(request_id.clone());
2299        let text = crate::subc_format::format_response_with_context(
2300            &bare_name,
2301            &response,
2302            &format_context,
2303        );
2304        let result = ToolCallResult { text, response };
2305        let response_frame = build_tool_response_frame(
2306            frame.header.ver,
2307            frame.header.channel,
2308            frame.header.corr,
2309            frame.header.flags,
2310            &result,
2311        )?;
2312        return send_frame(tx, response_frame).await;
2313    }
2314
2315    // A non-core name is NOT in the tool manifest. AFT fails closed and
2316    // does not trust subc to enforce the manifest: rejecting here is the
2317    // defense-in-depth backstop that prevents a forwarded native command
2318    // (e.g. `configure`, which would reach handle_configure and bypass
2319    // the RouteBind config-trust cap) from ever reaching dispatch. Only
2320    // the integration-test harness (run_subc_mode_for_test) opens this to
2321    // drive synthetic native commands through the executor.
2322    if !is_subc_agent_core_tool(&call.name)
2323        && !is_subc_native_plumbing_tool(&call.name)
2324        && !allow_native_passthrough
2325    {
2326        log::warn!(
2327            "subc tool call: rejecting non-manifest tool name {:?} on route {} (fail-closed)",
2328            call.name,
2329            frame.header.channel
2330        );
2331        let response = Response::error(
2332            request_id.clone(),
2333            "unknown_tool",
2334            format!("tool {:?} is not in the AFT tool manifest", call.name),
2335        );
2336        let text = crate::subc_format::format_response_with_context(
2337            &bare_name,
2338            &response,
2339            &format_context,
2340        );
2341        let result = ToolCallResult { text, response };
2342        let response_frame = build_tool_response_frame(
2343            frame.header.ver,
2344            frame.header.channel,
2345            frame.header.corr,
2346            frame.header.flags,
2347            &result,
2348        )?;
2349        return send_frame(tx, response_frame).await;
2350    }
2351
2352    if bare_name == "bash" {
2353        let meta = live_roots
2354            .entry(identity.root.clone())
2355            .or_insert_with(|| RootMeta::new(Instant::now()));
2356        meta.active_bash_waits = meta.active_bash_waits.saturating_add(1);
2357        meta.touch();
2358
2359        let route_cancel = route_bash_cancels
2360            .entry(route_id)
2361            .or_insert_with(|| RouteBashCancel {
2362                token: PersistentCancelSignal::new(),
2363                active_waits: 0,
2364            });
2365        route_cancel.active_waits = route_cancel.active_waits.saturating_add(1);
2366        let cancel = BashWaitCancel {
2367            connection: connection_cancel.clone(),
2368            route: route_cancel.token.clone(),
2369        };
2370
2371        submit_deferred_bash(
2372            executor,
2373            bash_deferred_tx,
2374            bash_poll_touch_tx,
2375            dispatch,
2376            identity.root,
2377            identity.project_root,
2378            identity.session,
2379            request_id,
2380            frame.header.channel,
2381            frame.header.corr,
2382            frame.header.flags,
2383            frame.header.ver,
2384            call.arguments,
2385            format_context,
2386            cancel,
2387            bind_trust,
2388        );
2389        return Ok(());
2390    }
2391
2392    let lane = command_lane(&bare_name);
2393    let tool_call_context = ToolCallContext {
2394        project_root: identity.project_root.clone(),
2395        session_id: Some(identity.session.clone()),
2396        request_id: request_id.clone(),
2397        diagnostics_on_edit,
2398        preview: false,
2399    };
2400    let arguments_for_run = call.arguments.clone();
2401    let bare_name_for_run = bare_name.clone();
2402    let bare_name_for_frame = bare_name.clone();
2403    let bare_name_for_finalize = bare_name.clone();
2404    let session_for_log = identity.session.clone();
2405    let session_for_finalize = identity.session.clone();
2406    let request_id_for_force = request_id.clone();
2407    let format_context_for_frame = format_context;
2408    let (text_tx, text_rx) = oneshot::channel::<String>();
2409    let rx = executor.submit_async(
2410        identity.root,
2411        lane,
2412        request_id.clone(),
2413        Box::new(move |ctx| {
2414            log_ctx::with_session(Some(session_for_log.clone()), || {
2415                let run = || {
2416                    let dispatch_with_finalize = |raw_req: RawRequest, app_ctx: &AppContext| {
2417                        let mut response = dispatch(raw_req, app_ctx);
2418                        crate::response_finalize::finalize_response_with_bg_completions(
2419                            &mut response,
2420                            app_ctx,
2421                            &session_for_finalize,
2422                            &bare_name_for_finalize,
2423                            bind_trust.allows_bash_observation(),
2424                        );
2425                        response
2426                    };
2427                    match run_tool_call(
2428                        &bare_name_for_run,
2429                        &arguments_for_run,
2430                        &tool_call_context,
2431                        ctx,
2432                        &dispatch_with_finalize,
2433                    ) {
2434                        ToolCallOutcome::Unary(result) => {
2435                            let _ = text_tx.send(result.text);
2436                            result.response
2437                        }
2438                    }
2439                };
2440                if matches!(bind_trust, BindTrust::Untrusted) {
2441                    ctx.with_force_restrict(&request_id_for_force, run)
2442                } else {
2443                    run()
2444                }
2445            })
2446        }),
2447    );
2448    let completion_tx = tx.clone();
2449    let completion_shutdown = Arc::clone(shutdown);
2450    let route_channel = frame.header.channel;
2451    let corr = frame.header.corr;
2452    let flags = frame.header.flags;
2453    let ver = frame.header.ver;
2454    tokio::spawn(async move {
2455        let response = await_executor_response(rx, request_id.clone()).await;
2456        let text = text_rx.await.unwrap_or_else(|_| {
2457            crate::subc_format::format_response_with_context(
2458                &bare_name_for_frame,
2459                &response,
2460                &format_context_for_frame,
2461            )
2462        });
2463        let result = ToolCallResult { text, response };
2464        let fatal = response_is_fatal_panic(&result.response);
2465        match build_tool_response_frame(ver, route_channel, corr, flags, &result) {
2466            Ok(response_frame) => {
2467                let _ = completion_tx.send(response_frame).await;
2468            }
2469            Err(error) => {
2470                log::error!("subc attach: failed to build tool response frame: {error}");
2471            }
2472        }
2473        if fatal {
2474            signal_fatal_teardown(
2475                &completion_tx,
2476                Some(route_channel),
2477                ver,
2478                corr,
2479                &completion_shutdown,
2480            )
2481            .await;
2482        }
2483    });
2484    Ok(())
2485}
2486
2487#[derive(Clone, Copy, Debug, Default)]
2488struct BashTranslatedSettings {
2489    background: bool,
2490    pty: bool,
2491    wait: bool,
2492    block_to_completion: bool,
2493    timeout: Option<u64>,
2494}
2495
2496enum BashSpawnControl {
2497    Immediate,
2498    Foreground {
2499        task_id: String,
2500        session_id: String,
2501        project_root: Option<PathBuf>,
2502        storage_dir: PathBuf,
2503        deadline: Instant,
2504        block_to_completion: bool,
2505        timeout: Option<u64>,
2506        wait_window_ms: u64,
2507    },
2508}
2509
2510enum BashPollControl {
2511    Done,
2512    Promote,
2513    Wait,
2514}
2515
2516fn bash_settings_from_translated(args: &serde_json::Map<String, Value>) -> BashTranslatedSettings {
2517    BashTranslatedSettings {
2518        background: args
2519            .get("background")
2520            .and_then(Value::as_bool)
2521            .unwrap_or(false),
2522        pty: args.get("pty").and_then(Value::as_bool).unwrap_or(false),
2523        wait: args.get("wait").and_then(Value::as_bool).unwrap_or(false),
2524        block_to_completion: args
2525            .get("block_to_completion")
2526            .and_then(Value::as_bool)
2527            .unwrap_or(false),
2528        timeout: args.get("timeout").and_then(Value::as_u64),
2529    }
2530}
2531
2532fn finalized_bash_result(
2533    mut response: Response,
2534    ctx: &AppContext,
2535    session_id: &str,
2536    format_context: &crate::subc_format::FormatContext,
2537    allow_bg_completions: bool,
2538) -> ToolCallResult {
2539    crate::response_finalize::finalize_response_with_bg_completions(
2540        &mut response,
2541        ctx,
2542        session_id,
2543        "bash",
2544        allow_bg_completions,
2545    );
2546    bash_result_from_response(response, format_context)
2547}
2548
2549fn bash_result_from_response(
2550    response: Response,
2551    format_context: &crate::subc_format::FormatContext,
2552) -> ToolCallResult {
2553    let text = crate::subc_format::format_response_with_context("bash", &response, format_context);
2554    ToolCallResult { text, response }
2555}
2556
2557fn bash_background_launch_response(request_id: &str, task_id: &str, is_pty: bool) -> Response {
2558    Response::success(
2559        request_id,
2560        json!({
2561            "output": crate::commands::bash_orchestrate::format_background_launch(task_id, is_pty),
2562            "task_id": task_id,
2563            "status": "running",
2564            "mode": if is_pty { "pty" } else { "pipes" },
2565        }),
2566    )
2567}
2568
2569fn finish_bash_spawn_immediate(
2570    response: Response,
2571    ctx: &AppContext,
2572    session_id: &str,
2573    format_context: &crate::subc_format::FormatContext,
2574    text_tx: &mut Option<oneshot::Sender<String>>,
2575    control_tx: &mut Option<oneshot::Sender<BashSpawnControl>>,
2576    allow_bg_completions: bool,
2577) -> Response {
2578    let result = finalized_bash_result(
2579        response,
2580        ctx,
2581        session_id,
2582        format_context,
2583        allow_bg_completions,
2584    );
2585    let ToolCallResult { text, response } = result;
2586    if let Some(tx) = text_tx.take() {
2587        let _ = tx.send(text);
2588    }
2589    if let Some(tx) = control_tx.take() {
2590        let _ = tx.send(BashSpawnControl::Immediate);
2591    }
2592    response
2593}
2594
2595fn finish_bash_poll_done(
2596    response: Response,
2597    ctx: &AppContext,
2598    session_id: &str,
2599    format_context: &crate::subc_format::FormatContext,
2600    text_tx: &mut Option<oneshot::Sender<String>>,
2601    control_tx: &mut Option<oneshot::Sender<BashPollControl>>,
2602) -> Response {
2603    let result = finalized_bash_result(response, ctx, session_id, format_context, true);
2604    let ToolCallResult { text, response } = result;
2605    if let Some(tx) = text_tx.take() {
2606        let _ = tx.send(text);
2607    }
2608    if let Some(tx) = control_tx.take() {
2609        let _ = tx.send(BashPollControl::Done);
2610    }
2611    response
2612}
2613
2614#[allow(clippy::too_many_arguments)]
2615fn submit_deferred_bash(
2616    executor: &Arc<Executor>,
2617    completion_tx: &mpsc::Sender<BashDeferredCompletion>,
2618    poll_touch_tx: &mpsc::Sender<ProjectRootId>,
2619    dispatch: DispatchFn,
2620    root: ProjectRootId,
2621    project_root: PathBuf,
2622    session_id: String,
2623    request_id: String,
2624    route_channel: u16,
2625    corr: u64,
2626    flags: Flags,
2627    ver: u8,
2628    arguments: Value,
2629    format_context: crate::subc_format::FormatContext,
2630    cancel: BashWaitCancel,
2631    bind_trust: BindTrust,
2632) {
2633    let (spawn_control_tx, spawn_control_rx) = oneshot::channel::<BashSpawnControl>();
2634    let (spawn_text_tx, spawn_text_rx) = oneshot::channel::<String>();
2635    let root_for_spawn = root.clone();
2636    let request_id_for_spawn = request_id.clone();
2637    let session_for_spawn = session_id.clone();
2638    let project_root_for_spawn = project_root.clone();
2639    let format_context_for_spawn = format_context.clone();
2640    let spawn_rx = executor.submit_async(
2641        root_for_spawn,
2642        Lane::Mutating,
2643        request_id.clone(),
2644        Box::new(move |ctx| {
2645            log_ctx::with_session(Some(session_for_spawn.clone()), || {
2646                let mut spawn_text_tx = Some(spawn_text_tx);
2647                let mut spawn_control_tx = Some(spawn_control_tx);
2648
2649                if matches!(bind_trust, BindTrust::Untrusted) {
2650                    let response = bash_denied_untrusted_response(request_id_for_spawn.clone());
2651                    return finish_bash_spawn_immediate(
2652                        response,
2653                        ctx,
2654                        &session_for_spawn,
2655                        &format_context_for_spawn,
2656                        &mut spawn_text_tx,
2657                        &mut spawn_control_tx,
2658                        false,
2659                    );
2660                }
2661
2662                let translated = match crate::subc_translate::subc_translate(
2663                    "bash",
2664                    &arguments,
2665                    &project_root_for_spawn,
2666                ) {
2667                    Ok(translated) => translated,
2668                    Err(error) => {
2669                        let response = Response::error(
2670                            request_id_for_spawn.clone(),
2671                            error.code,
2672                            error.message,
2673                        );
2674                        return finish_bash_spawn_immediate(
2675                            response,
2676                            ctx,
2677                            &session_for_spawn,
2678                            &format_context_for_spawn,
2679                            &mut spawn_text_tx,
2680                            &mut spawn_control_tx,
2681                            true,
2682                        );
2683                    }
2684                };
2685                let settings = bash_settings_from_translated(&translated.args);
2686                let raw_req = RawRequest {
2687                    id: request_id_for_spawn.clone(),
2688                    command: "bash".to_string(),
2689                    lsp_hints: None,
2690                    session_id: Some(session_for_spawn.clone()),
2691                    params: Value::Object(translated.args),
2692                };
2693                let response = dispatch(raw_req, ctx);
2694                if !response.success {
2695                    return finish_bash_spawn_immediate(
2696                        response,
2697                        ctx,
2698                        &session_for_spawn,
2699                        &format_context_for_spawn,
2700                        &mut spawn_text_tx,
2701                        &mut spawn_control_tx,
2702                        true,
2703                    );
2704                }
2705
2706                let Some(task_id) = response
2707                    .data
2708                    .get("task_id")
2709                    .and_then(Value::as_str)
2710                    .map(str::to_string)
2711                else {
2712                    return finish_bash_spawn_immediate(
2713                        response,
2714                        ctx,
2715                        &session_for_spawn,
2716                        &format_context_for_spawn,
2717                        &mut spawn_text_tx,
2718                        &mut spawn_control_tx,
2719                        true,
2720                    );
2721                };
2722                if response.data.get("status").and_then(Value::as_str) != Some("running") {
2723                    return finish_bash_spawn_immediate(
2724                        response,
2725                        ctx,
2726                        &session_for_spawn,
2727                        &format_context_for_spawn,
2728                        &mut spawn_text_tx,
2729                        &mut spawn_control_tx,
2730                        true,
2731                    );
2732                }
2733
2734                let mode = response
2735                    .data
2736                    .get("mode")
2737                    .and_then(Value::as_str)
2738                    .unwrap_or("pipes");
2739                let is_pty = mode == "pty" || settings.pty;
2740                if is_pty || settings.background {
2741                    let response =
2742                        bash_background_launch_response(&request_id_for_spawn, &task_id, is_pty);
2743                    return finish_bash_spawn_immediate(
2744                        response,
2745                        ctx,
2746                        &session_for_spawn,
2747                        &format_context_for_spawn,
2748                        &mut spawn_text_tx,
2749                        &mut spawn_control_tx,
2750                        true,
2751                    );
2752                }
2753
2754                let wait_window_ms =
2755                    crate::commands::bash_orchestrate::select_foreground_wait_window_ms(
2756                        ctx.config().foreground_wait_window_ms,
2757                        settings.timeout,
2758                        settings.wait,
2759                    );
2760                let deadline = Instant::now() + Duration::from_millis(wait_window_ms);
2761                let storage_dir =
2762                    crate::bash_background::storage_dir(ctx.config().storage_dir.as_deref());
2763                let project_root = ctx.config().project_root.clone();
2764                if let Some(tx) = spawn_control_tx.take() {
2765                    let _ = tx.send(BashSpawnControl::Foreground {
2766                        task_id,
2767                        session_id: session_for_spawn.clone(),
2768                        project_root,
2769                        storage_dir,
2770                        deadline,
2771                        block_to_completion: settings.block_to_completion || settings.wait,
2772                        timeout: settings.timeout,
2773                        wait_window_ms,
2774                    });
2775                }
2776                response
2777            })
2778        }),
2779    );
2780
2781    let executor = Arc::clone(executor);
2782    let completion_tx = completion_tx.clone();
2783    let poll_touch_tx = poll_touch_tx.clone();
2784    let root_for_task = root.clone();
2785    tokio::spawn(async move {
2786        let spawn_response = await_executor_response(spawn_rx, request_id.clone()).await;
2787        let spawn_control = spawn_control_rx.await;
2788        match spawn_control {
2789            Ok(BashSpawnControl::Immediate) => {
2790                let text = spawn_text_rx.await.unwrap_or_else(|_| {
2791                    crate::subc_format::format_response_with_context(
2792                        "bash",
2793                        &spawn_response,
2794                        &format_context,
2795                    )
2796                });
2797                let result = ToolCallResult {
2798                    text,
2799                    response: spawn_response,
2800                };
2801                let fatal = response_is_fatal_panic(&result.response);
2802                send_bash_deferred_completion(
2803                    &completion_tx,
2804                    route_channel,
2805                    corr,
2806                    flags,
2807                    ver,
2808                    root_for_task,
2809                    request_id,
2810                    Some(result),
2811                    fatal,
2812                )
2813                .await;
2814            }
2815            Ok(BashSpawnControl::Foreground {
2816                task_id,
2817                session_id,
2818                project_root,
2819                storage_dir,
2820                deadline,
2821                block_to_completion,
2822                timeout,
2823                wait_window_ms,
2824            }) => {
2825                run_deferred_bash_wait(
2826                    executor,
2827                    completion_tx,
2828                    poll_touch_tx,
2829                    route_channel,
2830                    corr,
2831                    flags,
2832                    ver,
2833                    root_for_task,
2834                    request_id,
2835                    task_id,
2836                    session_id,
2837                    project_root,
2838                    storage_dir,
2839                    deadline,
2840                    block_to_completion,
2841                    timeout,
2842                    wait_window_ms,
2843                    format_context,
2844                    cancel,
2845                )
2846                .await;
2847            }
2848            Err(_) => {
2849                let result = bash_result_from_response(spawn_response, &format_context);
2850                let fatal = response_is_fatal_panic(&result.response);
2851                send_bash_deferred_completion(
2852                    &completion_tx,
2853                    route_channel,
2854                    corr,
2855                    flags,
2856                    ver,
2857                    root_for_task,
2858                    request_id,
2859                    Some(result),
2860                    fatal,
2861                )
2862                .await;
2863            }
2864        }
2865    });
2866}
2867
2868#[allow(clippy::too_many_arguments)]
2869async fn run_deferred_bash_wait(
2870    executor: Arc<Executor>,
2871    completion_tx: mpsc::Sender<BashDeferredCompletion>,
2872    poll_touch_tx: mpsc::Sender<ProjectRootId>,
2873    route_channel: u16,
2874    corr: u64,
2875    flags: Flags,
2876    ver: u8,
2877    root: ProjectRootId,
2878    request_id: String,
2879    task_id: String,
2880    session_id: String,
2881    project_root: Option<PathBuf>,
2882    storage_dir: PathBuf,
2883    deadline: Instant,
2884    block_to_completion: bool,
2885    timeout: Option<u64>,
2886    wait_window_ms: u64,
2887    format_context: crate::subc_format::FormatContext,
2888    cancel: BashWaitCancel,
2889) {
2890    loop {
2891        tokio::select! {
2892            _ = cancel.cancelled() => {
2893                send_bash_deferred_completion(
2894                    &completion_tx,
2895                    route_channel,
2896                    corr,
2897                    flags,
2898                    ver,
2899                    root,
2900                    request_id,
2901                    None,
2902                    false,
2903                )
2904                .await;
2905                break;
2906            }
2907            _ = tokio::time::sleep(PENDING_POLL_INTERVAL) => {
2908                let (poll_control_tx, poll_control_rx) = oneshot::channel::<BashPollControl>();
2909                let (poll_text_tx, poll_text_rx) = oneshot::channel::<String>();
2910                let root_for_poll = root.clone();
2911                let request_id_for_poll = request_id.clone();
2912                let task_id_for_poll = task_id.clone();
2913                let session_for_poll = session_id.clone();
2914                let storage_for_poll = storage_dir.clone();
2915                let project_root_for_poll = project_root.clone();
2916                let format_context_for_poll = format_context.clone();
2917                let poll_rx = executor.submit_async(
2918                    root_for_poll,
2919                    Lane::PureRead,
2920                    request_id.clone(),
2921                    Box::new(move |ctx| {
2922                        log_ctx::with_session(Some(session_for_poll.clone()), || {
2923                            let mut poll_text_tx = Some(poll_text_tx);
2924                            let mut poll_control_tx = Some(poll_control_tx);
2925
2926                            let Some(snapshot) = crate::commands::bash_orchestrate::poll_bash_status(
2927                                ctx,
2928                                &task_id_for_poll,
2929                                &session_for_poll,
2930                                project_root_for_poll.as_deref(),
2931                                &storage_for_poll,
2932                                crate::bash_background::output::RUNNING_OUTPUT_PREVIEW_BYTES,
2933                            ) else {
2934                                return finish_bash_poll_done(
2935                                    crate::commands::bash_orchestrate::task_not_found_response(
2936                                        &request_id_for_poll,
2937                                        &task_id_for_poll,
2938                                    ),
2939                                    ctx,
2940                                    &session_for_poll,
2941                                    &format_context_for_poll,
2942                                    &mut poll_text_tx,
2943                                    &mut poll_control_tx,
2944                                );
2945                            };
2946
2947                            match crate::commands::bash_orchestrate::decide_bash_step(
2948                                snapshot,
2949                                deadline,
2950                                block_to_completion,
2951                                Instant::now(),
2952                                &request_id_for_poll,
2953                            ) {
2954                                crate::commands::bash_orchestrate::BashStep::Done(response) => {
2955                                    finish_bash_poll_done(
2956                                        response,
2957                                        ctx,
2958                                        &session_for_poll,
2959                                        &format_context_for_poll,
2960                                        &mut poll_text_tx,
2961                                        &mut poll_control_tx,
2962                                    )
2963                                }
2964                                crate::commands::bash_orchestrate::BashStep::Promote => {
2965                                    if let Some(tx) = poll_control_tx.take() {
2966                                        let _ = tx.send(BashPollControl::Promote);
2967                                    }
2968                                    Response::success(
2969                                        request_id_for_poll,
2970                                        json!({ "subc_bash_step": "promote" }),
2971                                    )
2972                                }
2973                                crate::commands::bash_orchestrate::BashStep::Wait => {
2974                                    if let Some(tx) = poll_control_tx.take() {
2975                                        let _ = tx.send(BashPollControl::Wait);
2976                                    }
2977                                    Response::success(
2978                                        request_id_for_poll,
2979                                        json!({ "subc_bash_step": "wait" }),
2980                                    )
2981                                }
2982                            }
2983                        })
2984                    }),
2985                );
2986                let poll_response = await_executor_response(poll_rx, request_id.clone()).await;
2987                let _ = poll_touch_tx.send(root.clone()).await;
2988                match poll_control_rx.await.unwrap_or(BashPollControl::Done) {
2989                    BashPollControl::Done => {
2990                        let text = poll_text_rx.await.unwrap_or_else(|_| {
2991                            crate::subc_format::format_response_with_context(
2992                                "bash",
2993                                &poll_response,
2994                                &format_context,
2995                            )
2996                        });
2997                        let result = ToolCallResult {
2998                            text,
2999                            response: poll_response,
3000                        };
3001                        let fatal = response_is_fatal_panic(&result.response);
3002                        send_bash_deferred_completion(
3003                            &completion_tx,
3004                            route_channel,
3005                            corr,
3006                            flags,
3007                            ver,
3008                            root,
3009                            request_id,
3010                            Some(result),
3011                            fatal,
3012                        )
3013                        .await;
3014                        break;
3015                    }
3016                    BashPollControl::Promote => {
3017                        let result = submit_bash_promote(
3018                            &executor,
3019                            root.clone(),
3020                            request_id.clone(),
3021                            task_id.clone(),
3022                            session_id.clone(),
3023                            timeout,
3024                            wait_window_ms,
3025                            format_context.clone(),
3026                        )
3027                        .await;
3028                        let fatal = response_is_fatal_panic(&result.response);
3029                        send_bash_deferred_completion(
3030                            &completion_tx,
3031                            route_channel,
3032                            corr,
3033                            flags,
3034                            ver,
3035                            root,
3036                            request_id,
3037                            Some(result),
3038                            fatal,
3039                        )
3040                        .await;
3041                        break;
3042                    }
3043                    BashPollControl::Wait => {}
3044                }
3045            }
3046        }
3047    }
3048}
3049
3050async fn submit_bash_promote(
3051    executor: &Arc<Executor>,
3052    root: ProjectRootId,
3053    request_id: String,
3054    task_id: String,
3055    session_id: String,
3056    timeout: Option<u64>,
3057    wait_window_ms: u64,
3058    format_context: crate::subc_format::FormatContext,
3059) -> ToolCallResult {
3060    let (text_tx, text_rx) = oneshot::channel::<String>();
3061    let request_id_for_promote = request_id.clone();
3062    let task_id_for_promote = task_id.clone();
3063    let session_for_promote = session_id.clone();
3064    let format_context_for_promote = format_context.clone();
3065    let promote_rx = executor.submit_async(
3066        root,
3067        Lane::Mutating,
3068        request_id.clone(),
3069        Box::new(move |ctx| {
3070            log_ctx::with_session(Some(session_for_promote.clone()), || {
3071                let response = if let Some(value) =
3072                    std::env::var_os("AFT_TEST_FORCE_SUBC_BASH_PROMOTE_ERROR")
3073                {
3074                    if value.to_string_lossy() == "panic" {
3075                        panic!("forced subc bash promote panic");
3076                    }
3077                    Response::error(
3078                        &request_id_for_promote,
3079                        "execution_failed",
3080                        "forced subc bash promote failure",
3081                    )
3082                } else {
3083                    crate::commands::bash_orchestrate::promote_bash(
3084                        ctx,
3085                        &task_id_for_promote,
3086                        &session_for_promote,
3087                        ctx.config().project_root.as_deref(),
3088                        timeout,
3089                        wait_window_ms,
3090                        &request_id_for_promote,
3091                    )
3092                };
3093                let result = finalized_bash_result(
3094                    response,
3095                    ctx,
3096                    &session_for_promote,
3097                    &format_context_for_promote,
3098                    true,
3099                );
3100                let ToolCallResult { text, response } = result;
3101                let _ = text_tx.send(text);
3102                response
3103            })
3104        }),
3105    );
3106    let response = await_executor_response(promote_rx, request_id).await;
3107    let text = text_rx.await.unwrap_or_else(|_| {
3108        crate::subc_format::format_response_with_context("bash", &response, &format_context)
3109    });
3110    ToolCallResult { text, response }
3111}
3112
3113#[allow(clippy::too_many_arguments)]
3114async fn send_bash_deferred_completion(
3115    completion_tx: &mpsc::Sender<BashDeferredCompletion>,
3116    channel: u16,
3117    corr: u64,
3118    flags: Flags,
3119    ver: u8,
3120    root: ProjectRootId,
3121    request_id: String,
3122    result: Option<ToolCallResult>,
3123    fatal: bool,
3124) {
3125    let _ = completion_tx
3126        .send(BashDeferredCompletion {
3127            channel,
3128            corr,
3129            flags,
3130            ver,
3131            root,
3132            request_id,
3133            result,
3134            fatal,
3135        })
3136        .await;
3137}
3138
3139async fn handle_bash_deferred_completion(
3140    tx: &mpsc::Sender<Frame>,
3141    done: BashDeferredCompletion,
3142    routes: &HashMap<RouteChannel, RouteIdentity>,
3143    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
3144    route_bash_cancels: &mut HashMap<RouteChannel, RouteBashCancel>,
3145    shutdown: &Arc<Notify>,
3146) -> Result<(), SubcError> {
3147    if let Some(meta) = live_roots.get_mut(&done.root) {
3148        meta.active_bash_waits = meta.active_bash_waits.saturating_sub(1);
3149        meta.touch();
3150    }
3151    let route_id = route_key(done.channel);
3152    let remove_route_cancel = if let Some(cancel) = route_bash_cancels.get_mut(&route_id) {
3153        cancel.active_waits = cancel.active_waits.saturating_sub(1);
3154        cancel.active_waits == 0
3155    } else {
3156        false
3157    };
3158    if remove_route_cancel {
3159        route_bash_cancels.remove(&route_id);
3160    }
3161
3162    if let Some(result) = done.result {
3163        if routes.contains_key(&route_id) {
3164            let frame =
3165                build_tool_response_frame(done.ver, done.channel, done.corr, done.flags, &result)?;
3166            send_frame(tx, frame).await?;
3167        } else {
3168            log::debug!(
3169                "subc attach: dropping deferred bash response {} for unbound route {}",
3170                done.request_id,
3171                done.channel
3172            );
3173        }
3174    } else {
3175        log::debug!(
3176            "subc attach: deferred bash wait {} cancelled before delivery on route {}",
3177            done.request_id,
3178            done.channel
3179        );
3180    }
3181
3182    if done.fatal {
3183        signal_fatal_teardown(tx, Some(done.channel), done.ver, done.corr, shutdown).await;
3184    }
3185    Ok(())
3186}
3187fn submit_maintenance_drain(
3188    executor: &Arc<Executor>,
3189    root_id: ProjectRootId,
3190    bg_sessions_to_check: Vec<(String, u64)>,
3191    completion_tx: &mpsc::Sender<MaintenanceCompletion>,
3192) {
3193    let request_id = format!(
3194        "subc-maintenance-drain-{}",
3195        root_id.as_path().to_string_lossy()
3196    );
3197    let response_id = request_id.clone();
3198    let completion_root_id = root_id.clone();
3199    let (empty_bg_sessions_tx, empty_bg_sessions_rx) = oneshot::channel::<Vec<(String, u64)>>();
3200    let rx = executor.submit_async(
3201        root_id,
3202        Lane::Mutating,
3203        request_id.clone(),
3204        Box::new(move |ctx| {
3205            runtime_drain::drain_configure_warning_events(ctx);
3206            runtime_drain::drain_search_index_events(ctx);
3207            runtime_drain::drain_callgraph_store_events(ctx);
3208            runtime_drain::drain_semantic_index_events(ctx);
3209            runtime_drain::drain_semantic_refresh_events(ctx);
3210            runtime_drain::drain_inspect_events(ctx);
3211            runtime_drain::drain_watcher_events(ctx);
3212            runtime_drain::drain_lsp_events(ctx);
3213            let empty_bg_sessions = bg_sessions_to_check
3214                .into_iter()
3215                .filter(|(session, _)| {
3216                    !ctx.bash_background()
3217                        .has_completions_for_session(Some(session.as_str()))
3218                })
3219                .collect();
3220            let _ = empty_bg_sessions_tx.send(empty_bg_sessions);
3221            Response::success(response_id, json!({ "drained": true }))
3222        }),
3223    );
3224    let completion_tx = completion_tx.clone();
3225    tokio::spawn(async move {
3226        let response = await_executor_response(rx, request_id).await;
3227        let empty_bg_sessions = empty_bg_sessions_rx.await.unwrap_or_default();
3228        let _ = completion_tx
3229            .send(MaintenanceCompletion {
3230                root_id: completion_root_id,
3231                response,
3232                empty_bg_sessions,
3233            })
3234            .await;
3235    });
3236}
3237
3238async fn await_executor_response(rx: oneshot::Receiver<Response>, request_id: String) -> Response {
3239    rx.await
3240        .unwrap_or_else(|_| Response::error(request_id, "internal_error", "executor dropped"))
3241}
3242
3243/// Flatten a tool-call `Response` + server-rendered `text` into the SAME flat
3244/// object the standalone NDJSON `tool_call` command puts on the wire:
3245/// `{id, success, ...data, text}` (Response flattens `data` to the top level —
3246/// protocol.rs — and `response_with_text` merges `text` in). Mirrors
3247/// `commands::tool_call::response_with_text` exactly, including its non-object
3248/// `data` fallback (data replaced by `{text}`), so the subc `structuredContent`
3249/// is byte-identical to the standalone response body. Built field-by-field
3250/// rather than via `serde_json::to_value(response)` because `#[serde(flatten)]`
3251/// of a non-object `data` would error.
3252fn flat_tool_response(response: &crate::protocol::Response, text: &str) -> Value {
3253    let mut obj = serde_json::Map::new();
3254    obj.insert("id".to_string(), Value::String(response.id.clone()));
3255    obj.insert("success".to_string(), Value::Bool(response.success));
3256    if let Some(data) = response.data.as_object() {
3257        for (key, value) in data {
3258            obj.insert(key.clone(), value.clone());
3259        }
3260    }
3261    obj.insert("text".to_string(), Value::String(text.to_string()));
3262    Value::Object(obj)
3263}
3264
3265fn build_tool_response_frame(
3266    ver: u8,
3267    route_channel: u16,
3268    corr: u64,
3269    flags: Flags,
3270    result: &ToolCallResult,
3271) -> Result<Frame, SubcError> {
3272    let is_error = !result.response.success;
3273    // `content`/`isError` is the MCP-native surface a GENERIC host reads (and a
3274    // generic host ignores `structuredContent`, per the MCP spec). The
3275    // FIRST-PARTY AFT plugin instead reads `structuredContent`, which carries
3276    // the full flat standalone shape ({id, success, ...data, text}) so every
3277    // structured sidecar the plugin drives UI from — status_bar, bg_completions
3278    // (in-band drain), preview_diff, code, message, attachments — survives the
3279    // route. subc relays the body byte-for-byte, so this reaches the plugin
3280    // unchanged. SubcTransport.toolCall re-lifts `structuredContent` straight to
3281    // the flat ToolCallResult, so nothing downstream of the transport differs
3282    // from the NDJSON path.
3283    let payload = json!({
3284        "content": [{ "type": "text", "text": result.text.as_str() }],
3285        "isError": is_error,
3286        "structuredContent": flat_tool_response(&result.response, &result.text),
3287    });
3288    let body = serde_json::to_vec(&payload).map_err(SubcError::Json)?;
3289
3290    Frame::build_with_version(ver, FrameType::Response, flags, route_channel, corr, body)
3291        .map_err(SubcError::FrameBuild)
3292}
3293
3294fn build_error_frame(
3295    ver: u8,
3296    channel: u16,
3297    corr: u64,
3298    flags: Flags,
3299    code: &str,
3300    message: &str,
3301) -> Result<Frame, SubcError> {
3302    let body = serde_json::to_vec(&ErrorBody {
3303        code: code.to_string(),
3304        message: message.to_string(),
3305    })
3306    .map_err(SubcError::Json)?;
3307    Frame::build_with_version(ver, FrameType::Error, flags, channel, corr, body)
3308        .map_err(SubcError::FrameBuild)
3309}
3310
3311fn build_goodbye_frame(ver: u8, channel: u16, corr: u64) -> Result<Frame, SubcError> {
3312    Frame::build_with_version(
3313        ver,
3314        FrameType::Goodbye,
3315        control_flags(),
3316        channel,
3317        corr,
3318        Vec::new(),
3319    )
3320    .map_err(SubcError::FrameBuild)
3321}
3322
3323async fn signal_fatal_teardown(
3324    tx: &mpsc::Sender<Frame>,
3325    route_channel: Option<u16>,
3326    ver: u8,
3327    corr: u64,
3328    shutdown: &Arc<Notify>,
3329) {
3330    if let Some(route_channel) = route_channel {
3331        if let Ok(frame) = build_goodbye_frame(ver, route_channel, corr) {
3332            if let Err(error) = send_frame(tx, frame).await {
3333                log::warn!(
3334                    "subc attach: failed to queue fatal route Goodbye for route {route_channel}: {error}"
3335                );
3336            }
3337        }
3338    }
3339    if let Ok(frame) = build_goodbye_frame(ver, 0, 0) {
3340        if let Err(error) = send_frame(tx, frame).await {
3341            log::warn!("subc attach: failed to queue fatal channel-0 Goodbye: {error}");
3342        }
3343    }
3344    shutdown.notify_one();
3345}
3346
3347fn response_message(response: &Response, fallback: &str) -> String {
3348    response
3349        .data
3350        .get("message")
3351        .and_then(Value::as_str)
3352        .map(ToOwned::to_owned)
3353        .unwrap_or_else(|| fallback.to_string())
3354}
3355
3356fn response_is_fatal_panic(response: &Response) -> bool {
3357    !response.success && response.data.get("code").and_then(Value::as_str) == Some("actor_fatal")
3358}
3359
3360fn bash_denied_untrusted_response(request_id: impl Into<String>) -> Response {
3361    Response::error(
3362        request_id.into(),
3363        "bash_denied_untrusted",
3364        "remote/MCP-facade binds cannot run shell commands",
3365    )
3366}
3367
3368fn is_bash_family_tool(name: &str) -> bool {
3369    name == "bash" || name.starts_with("bash_")
3370}
3371
3372fn is_subc_agent_core_tool(name: &str) -> bool {
3373    matches!(
3374        name,
3375        "status"
3376            | "bash"
3377            | "read"
3378            | "write"
3379            | "edit"
3380            | "apply_patch"
3381            | "grep"
3382            | "glob"
3383            | "search"
3384            | "outline"
3385            | "zoom"
3386            | "inspect"
3387            | "callgraph"
3388            | "conflicts"
3389            | "ast_search"
3390            | "ast_replace"
3391            | "delete"
3392            | "move"
3393            | "import"
3394            | "refactor"
3395            | "safety"
3396    )
3397}
3398
3399/// Internal bg-completion plumbing commands the harness consumer (NOT the agent)
3400/// invokes over a bound route to drain and acknowledge background-bash
3401/// completions for its session. These are NOT agent-facing tools — they carry no
3402/// agent surface and never reach the model — so they're not in the manifest /
3403/// `is_subc_agent_core_tool`, but the plugin's bg-notification drain/ack path
3404/// (bg-notifications.ts: `bridge.send("bash_drain_completions"|"bash_ack_completions")`)
3405/// must reach dispatch over subc, otherwise an idle agent can never drain a
3406/// completion the wake lane nudges it about.
3407///
3408/// This is a DELIBERATELY TIGHT allowlist (exactly these two names), kept
3409/// separate from the agent core-tool gate so it cannot widen the fail-closed
3410/// backstop in `handle_tool_call`. Both are session-scoped (the bind session is
3411/// reinjected by `run_tool_call`, overriding any body `session_id`) and touch
3412/// only the per-session completion registry — they carry NO config/trust surface,
3413/// so admitting them does not reopen the `configure`-bypass hole the gate exists
3414/// to close. Lanes are already assigned: `bash_drain_completions` = PureRead,
3415/// `bash_ack_completions` = Mutating (see `command_lane`).
3416fn is_subc_native_plumbing_tool(name: &str) -> bool {
3417    matches!(name, "bash_drain_completions" | "bash_ack_completions")
3418}
3419
3420fn command_lane(command: &str) -> Lane {
3421    match command {
3422        "ping"
3423        | "version"
3424        | "echo"
3425        | "bash_drain_completions"
3426        | "bash_regex_match"
3427        | "db_get_state"
3428        | "db_get_host_state"
3429        | "read"
3430        | "undo_preview"
3431        | "edit_history"
3432        | "checkpoint_paths"
3433        | "list_checkpoints"
3434        | "conflicts"
3435        | "glob"
3436        | "grep"
3437        | "git_conflicts"
3438        | "ast_search" => Lane::PureRead,
3439
3440        // Lazy reads mutate parser/terminal/url caches on a miss, but are still
3441        // classified onto the reader pool; install races are handled at the
3442        // individual cache sites.
3443        "bash_status" | "outline" | "zoom" => Lane::PureRead,
3444
3445        "status"
3446        | "inspect"
3447        | "lsp_diagnostics"
3448        | "lsp_inspect"
3449        | "lsp_hover"
3450        | "lsp_goto_definition"
3451        | "lsp_find_references"
3452        | "lsp_prepare_rename" => Lane::SerialLspStatus,
3453
3454        "semantic_search" | "search" | "callgraph" | "callers" | "impact" | "call_tree"
3455        | "trace_to" | "trace_to_symbol" | "trace_data" | "inspect_tier2_run" => Lane::HeavyInit,
3456
3457        "bash"
3458        | "bash_ack_completions"
3459        | "bash_notify"
3460        | "bash_unnotify"
3461        | "bash_promote"
3462        | "bash_kill"
3463        | "bash_write"
3464        | "db_set_state"
3465        | "db_set_host_state"
3466        | "undo"
3467        | "checkpoint"
3468        | "restore_checkpoint"
3469        | "write"
3470        | "delete_file"
3471        | "move_file"
3472        | "edit"
3473        | "edit_symbol"
3474        | "edit_match"
3475        | "batch"
3476        | "add_import"
3477        | "remove_import"
3478        | "organize_imports"
3479        | "configure"
3480        | "move_symbol"
3481        | "extract_function"
3482        | "inline_symbol"
3483        | "ast_replace"
3484        | "lsp_rename"
3485        | "list_filters"
3486        | "trust_filter_project"
3487        | "untrust_filter_project"
3488        | "snapshot" => Lane::Mutating,
3489
3490        _ => Lane::Mutating,
3491    }
3492}
3493
3494#[derive(Deserialize)]
3495struct BgEventsProbe {
3496    op: Option<String>,
3497}
3498
3499#[derive(Debug, Deserialize)]
3500struct ToolCallRequest {
3501    name: String,
3502    #[serde(default)]
3503    arguments: Value,
3504}
3505
3506static SUBC_TOOL_SCHEMAS: LazyLock<serde_json::Map<String, Value>> = LazyLock::new(|| {
3507    serde_json::from_str(include_str!("subc_tool_schemas.json"))
3508        .unwrap_or_else(|e| panic!("subc_tool_schemas.json: {e}"))
3509});
3510
3511fn tool_schema(name: &str) -> Value {
3512    SUBC_TOOL_SCHEMAS.get(name).cloned().unwrap_or_else(|| {
3513        log::warn!(
3514            "subc build_manifest: missing embedded schema for tool {name:?}; using placeholder"
3515        );
3516        json!({ "type": "object" })
3517    })
3518}
3519
3520/// AFT's subc-mode capability manifest. It uses bare internal tool names
3521/// because the gateway adds any `aft_` prefix for agent-facing displays; AFT
3522/// schedules concurrent calls itself; the gateway runs AFT directly without a
3523/// sandbox. The manifest lists every tool an agent can call over subc.
3524fn build_manifest() -> ModuleManifest {
3525    let tool = |name: &str, execution_mode: ExecutionMode| Tool {
3526        name: name.to_string(),
3527        execution_mode,
3528        schema: tool_schema(name),
3529    };
3530    // execution_mode keys on externally-observable side effects, NOT internal
3531    // ctx mutation: the readers warm AFT's own index/cache/symbol artifacts
3532    // (internal), not the user's workspace, so they are Pure. Bash is Mutating
3533    // because spawning a detached process changes external state, and edit/write
3534    // produce observable file writes. Unfenceable stays unused here because AFT
3535    // schedules bash internally and releases the Mutating worker after spawn.
3536    ModuleManifest {
3537        module_id: "aft".to_string(),
3538        module_version: env!("CARGO_PKG_VERSION").to_string(),
3539        protocol_ver: PROTOCOL_VERSION,
3540        trust_tier: TrustTier::FirstParty,
3541        provides: vec![ProviderRole::ToolProvider {
3542            tools: vec![
3543                tool("status", ExecutionMode::Pure),
3544                tool("bash", ExecutionMode::Mutating),
3545                tool("read", ExecutionMode::Pure),
3546                tool("write", ExecutionMode::Mutating),
3547                tool("edit", ExecutionMode::Mutating),
3548                tool("apply_patch", ExecutionMode::Mutating),
3549                tool("grep", ExecutionMode::Pure),
3550                tool("glob", ExecutionMode::Pure),
3551                tool("search", ExecutionMode::Pure),
3552                tool("outline", ExecutionMode::Pure),
3553                tool("zoom", ExecutionMode::Pure),
3554                tool("inspect", ExecutionMode::Pure),
3555                tool("callgraph", ExecutionMode::Pure),
3556                tool("conflicts", ExecutionMode::Pure),
3557                tool("ast_search", ExecutionMode::Pure),
3558                tool("ast_replace", ExecutionMode::Mutating),
3559                tool("delete", ExecutionMode::Mutating),
3560                tool("move", ExecutionMode::Mutating),
3561                tool("import", ExecutionMode::Mutating),
3562                tool("refactor", ExecutionMode::Mutating),
3563                tool("safety", ExecutionMode::Mutating),
3564            ],
3565            identity_scope: vec![IdentityScope::Session, IdentityScope::Project],
3566            concurrency: Concurrency::ModuleManaged,
3567            emits_push: true,
3568            sub_supervises: true,
3569        }],
3570        consumes: Vec::new(),
3571        scheduled_tasks: Vec::new(),
3572        bindings: Bindings {
3573            storage: StorageBinding {
3574                kind: StorageKind::Sqlite,
3575                scope: StorageScope::Project,
3576                owns_schema: true,
3577            },
3578            vault_grants: Vec::new(),
3579            identity: IdentityBinding {
3580                requires: vec![IdentityScope::Project],
3581                optional: vec![IdentityScope::Session],
3582            },
3583        },
3584    }
3585}
3586
3587fn control_flags() -> Flags {
3588    Flags::new(false, Priority::Passive, false)
3589}
3590
3591#[cfg(test)]
3592mod tests {
3593    use super::*;
3594    use crate::bash_background::BgTaskStatus;
3595    use crate::protocol::{
3596        BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, ConfigureWarningsFrame,
3597        ProgressFrame, StatusChangedFrame,
3598    };
3599    use serde_json::json;
3600
3601    fn test_root(name: &str) -> (tempfile::TempDir, ProjectRootId) {
3602        let dir = tempfile::Builder::new()
3603            .prefix(name)
3604            .tempdir()
3605            .expect("temp root");
3606        let root = ProjectRootId::from_path(dir.path()).expect("project root id");
3607        (dir, root)
3608    }
3609
3610    fn status_frame(seq: u64) -> PushFrame {
3611        status_frame_with_session(seq, None)
3612    }
3613
3614    fn status_frame_with_session(seq: u64, session_id: Option<&str>) -> PushFrame {
3615        PushFrame::StatusChanged(StatusChangedFrame {
3616            frame_type: "status_changed",
3617            session_id: session_id.map(str::to_string),
3618            snapshot: json!({ "seq": seq }),
3619        })
3620    }
3621
3622    fn completion_frame(task_id: &str) -> PushFrame {
3623        completion_frame_with_session(task_id, "session-1")
3624    }
3625
3626    fn completion_frame_with_session(task_id: &str, session_id: &str) -> PushFrame {
3627        PushFrame::BashCompleted(BashCompletedFrame {
3628            frame_type: "bash_completed",
3629            task_id: task_id.to_string(),
3630            session_id: session_id.to_string(),
3631            status: BgTaskStatus::Completed,
3632            exit_code: Some(0),
3633            command: format!("echo {task_id}"),
3634            output_preview: String::new(),
3635            output_truncated: false,
3636            original_tokens: None,
3637            compressed_tokens: None,
3638            tokens_skipped: false,
3639        })
3640    }
3641
3642    fn long_running_frame(task_id: &str, elapsed_ms: u64) -> PushFrame {
3643        long_running_frame_with_session(task_id, "session-1", elapsed_ms)
3644    }
3645
3646    fn long_running_frame_with_session(
3647        task_id: &str,
3648        session_id: &str,
3649        elapsed_ms: u64,
3650    ) -> PushFrame {
3651        PushFrame::BashLongRunning(BashLongRunningFrame {
3652            frame_type: "bash_long_running",
3653            task_id: task_id.to_string(),
3654            session_id: session_id.to_string(),
3655            command: format!("sleep {elapsed_ms}"),
3656            elapsed_ms,
3657        })
3658    }
3659
3660    fn pattern_match_frame(session_id: &str) -> PushFrame {
3661        PushFrame::BashPatternMatch(BashPatternMatchFrame {
3662            frame_type: "bash_pattern_match",
3663            task_id: "task-pattern".to_string(),
3664            session_id: session_id.to_string(),
3665            watch_id: "watch-1".to_string(),
3666            match_text: "needle".to_string(),
3667            match_offset: 7,
3668            context: "haystack needle".to_string(),
3669            once: true,
3670            reason: "pattern_match",
3671        })
3672    }
3673
3674    fn configure_warnings_frame(session_id: Option<&str>) -> PushFrame {
3675        PushFrame::ConfigureWarnings(ConfigureWarningsFrame {
3676            frame_type: "configure_warnings",
3677            session_id: session_id.map(str::to_string),
3678            project_root: "/tmp/subc-test".to_string(),
3679            source_file_count: 0,
3680            warnings: Vec::new(),
3681        })
3682    }
3683
3684    fn route_identity(root: &ProjectRootId, session_id: &str) -> RouteIdentity {
3685        route_identity_with_trust(root, session_id, BindTrust::FirstParty)
3686    }
3687
3688    fn route_identity_with_trust(
3689        root: &ProjectRootId,
3690        session_id: &str,
3691        trust: BindTrust,
3692    ) -> RouteIdentity {
3693        RouteIdentity {
3694            root: root.clone(),
3695            project_root: root.as_path().to_path_buf(),
3696            harness: "opencode".to_string(),
3697            session: session_id.to_string(),
3698            trust,
3699        }
3700    }
3701
3702    fn progress_frame(request_id: &str, kind: ProgressKind, chunk: &str) -> PushFrame {
3703        PushFrame::Progress(ProgressFrame::new(request_id, kind, chunk))
3704    }
3705
3706    fn status_seq(frame: &PushFrame) -> Option<u64> {
3707        match frame {
3708            PushFrame::StatusChanged(status) => status.snapshot.get("seq").and_then(|v| v.as_u64()),
3709            _ => None,
3710        }
3711    }
3712
3713    fn completion_task(frame: &PushFrame) -> Option<&str> {
3714        match frame {
3715            PushFrame::BashCompleted(completion) => Some(completion.task_id.as_str()),
3716            _ => None,
3717        }
3718    }
3719
3720    fn push_frame_task_id(frame: &Frame) -> Option<String> {
3721        let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
3722        body.get("task_id")
3723            .and_then(serde_json::Value::as_str)
3724            .map(str::to_string)
3725    }
3726
3727    #[test]
3728    fn trust_for_principal_matrix() {
3729        assert_eq!(
3730            trust_for_principal(&Some(Principal::Direct)),
3731            BindTrust::FirstParty
3732        );
3733        assert_eq!(
3734            trust_for_principal(&Some(Principal::Reserved {
3735                module_id: "llm-runner".to_string(),
3736            })),
3737            BindTrust::FirstParty
3738        );
3739        assert_eq!(
3740            trust_for_principal(&Some(Principal::Reserved {
3741                module_id: "aft".to_string(),
3742            })),
3743            BindTrust::FirstParty
3744        );
3745        assert_eq!(
3746            trust_for_principal(&Some(Principal::Reserved {
3747                module_id: "subc-mcp".to_string(),
3748            })),
3749            BindTrust::Untrusted
3750        );
3751        assert_eq!(
3752            trust_for_principal(&Some(Principal::Reserved {
3753                module_id: "anything-unknown".to_string(),
3754            })),
3755            BindTrust::Untrusted
3756        );
3757        assert_eq!(
3758            trust_for_principal(&Some(Principal::Unverified)),
3759            BindTrust::Untrusted
3760        );
3761        assert_eq!(trust_for_principal(&None), BindTrust::Untrusted);
3762    }
3763
3764    #[test]
3765    fn frame_classification_matches_push_delivery_contract() {
3766        let completion = completion_frame_with_session("done", "session-a");
3767        assert_eq!(frame_session(&completion), Some("session-a"));
3768        assert!(frame_is_reliable(&completion));
3769
3770        let long_running = long_running_frame_with_session("long", "session-b", 42);
3771        assert_eq!(frame_session(&long_running), Some("session-b"));
3772        assert!(!frame_is_reliable(&long_running));
3773
3774        let pattern_match = pattern_match_frame("session-c");
3775        assert_eq!(frame_session(&pattern_match), Some("session-c"));
3776        assert!(frame_is_reliable(&pattern_match));
3777
3778        let tagged_warnings = configure_warnings_frame(Some("session-d"));
3779        assert_eq!(frame_session(&tagged_warnings), Some("session-d"));
3780        assert!(frame_is_reliable(&tagged_warnings));
3781
3782        let untagged_warnings = configure_warnings_frame(None);
3783        assert_eq!(frame_session(&untagged_warnings), None);
3784        assert!(frame_is_reliable(&untagged_warnings));
3785
3786        let tagged_status = status_frame_with_session(1, Some("session-e"));
3787        assert_eq!(frame_session(&tagged_status), Some("session-e"));
3788        assert!(!frame_is_reliable(&tagged_status));
3789
3790        let project_status = status_frame(2);
3791        assert_eq!(frame_session(&project_status), None);
3792        assert!(!frame_is_reliable(&project_status));
3793
3794        let progress = progress_frame("request-1", ProgressKind::Stdout, "chunk");
3795        assert_eq!(frame_session(&progress), None);
3796        assert!(!frame_is_reliable(&progress));
3797    }
3798
3799    #[test]
3800    fn fan_out_push_frame_routes_session_scoped_and_project_scoped_frames() {
3801        let (_root_dir, root) = test_root("subc-session-routing-root");
3802        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(8);
3803        let identity1 = route_identity(&root, "session-1");
3804        let identity2 = route_identity(&root, "session-2");
3805        let mut routes = HashMap::new();
3806        routes.insert(route_key(1), identity1.clone());
3807        routes.insert(route_key(2), identity2.clone());
3808        let mut root_channels = HashMap::new();
3809        root_channels.insert(root.clone(), HashSet::from([route_key(1), route_key(2)]));
3810        let mut session_identity = HashMap::new();
3811        remember_session_identity(&mut session_identity, &identity1);
3812        remember_session_identity(&mut session_identity, &identity2);
3813        let mut retry_buffer = HashMap::new();
3814        let mut push_buffer = HashMap::new();
3815
3816        let session_result = fan_out_reliable_push_frame(
3817            &writer_tx,
3818            &routes,
3819            &root_channels,
3820            &session_identity,
3821            &mut retry_buffer,
3822            &mut push_buffer,
3823            &root,
3824            &completion_frame_with_session("session-only", "session-1"),
3825        );
3826        assert_eq!(
3827            session_result,
3828            FanOutResult {
3829                matched_channels: 1,
3830                sent_frames: 1,
3831            }
3832        );
3833        assert!(retry_buffer.is_empty());
3834        assert!(push_buffer.is_empty());
3835        let session_push = writer_rx.try_recv().expect("session push queued");
3836        assert_eq!(session_push.header.ty, FrameType::Push);
3837        assert_eq!(session_push.header.channel, 1);
3838        assert!(
3839            writer_rx.try_recv().is_err(),
3840            "session-scoped frame must not broadcast to sibling sessions"
3841        );
3842
3843        let project_result =
3844            fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(9));
3845        assert_eq!(
3846            project_result,
3847            FanOutResult {
3848                matched_channels: 2,
3849                sent_frames: 2,
3850            }
3851        );
3852        let project_channels: HashSet<_> = [
3853            writer_rx
3854                .try_recv()
3855                .expect("first project push")
3856                .header
3857                .channel,
3858            writer_rx
3859                .try_recv()
3860                .expect("second project push")
3861                .header
3862                .channel,
3863        ]
3864        .into_iter()
3865        .collect();
3866        assert_eq!(project_channels, HashSet::from([1, 2]));
3867        assert!(writer_rx.try_recv().is_err());
3868    }
3869
3870    #[test]
3871    fn push_buffer_drops_oldest_per_replay_key() {
3872        let (_root_dir, root) = test_root("subc-buffer-bound-root");
3873        let key = ReplayKey {
3874            root,
3875            harness: "opencode".to_string(),
3876            session: "session-1".to_string(),
3877        };
3878        let mut push_buffer = HashMap::new();
3879        let total = PUSH_BUFFER_MAX_PER_KEY + 3;
3880
3881        for index in 0..total {
3882            buffer_push_frame(
3883                &mut push_buffer,
3884                key.clone(),
3885                completion_frame(&format!("task-{index}")),
3886            );
3887        }
3888
3889        let buffered = push_buffer.get(&key).expect("buffer entry");
3890        assert_eq!(buffered.len(), PUSH_BUFFER_MAX_PER_KEY);
3891        let tasks: Vec<String> = buffered
3892            .iter()
3893            .filter_map(completion_task)
3894            .map(str::to_string)
3895            .collect();
3896        assert_eq!(tasks.first().map(String::as_str), Some("task-3"));
3897        assert_eq!(
3898            tasks.last().map(String::as_str),
3899            Some(format!("task-{}", total - 1).as_str())
3900        );
3901    }
3902
3903    #[test]
3904    fn replay_buffered_push_frames_drains_to_bound_channel() {
3905        let (_root_dir, root) = test_root("subc-buffer-replay-root");
3906        let key = ReplayKey {
3907            root,
3908            harness: "opencode".to_string(),
3909            session: "session-1".to_string(),
3910        };
3911        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
3912        let mut push_buffer = HashMap::new();
3913        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
3914        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-b"));
3915
3916        let replayed = replay_buffered_push_frames(
3917            &writer_tx,
3918            route_key(3),
3919            &mut push_buffer,
3920            &key,
3921            BindTrust::FirstParty,
3922        );
3923
3924        assert_eq!(replayed, 2);
3925        assert!(!push_buffer.contains_key(&key));
3926        for expected_task in ["task-a", "task-b"] {
3927            let frame = writer_rx.try_recv().expect("replayed push");
3928            assert_eq!(frame.header.ty, FrameType::Push);
3929            assert_eq!(frame.header.channel, 3);
3930            let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
3931            assert_eq!(body["task_id"].as_str(), Some(expected_task));
3932        }
3933        assert!(writer_rx.try_recv().is_err());
3934    }
3935
3936    #[test]
3937    fn replay_buffered_push_frames_skips_bash_for_untrusted_route() {
3938        let (_root_dir, root) = test_root("subc-buffer-replay-untrusted-root");
3939        let key = ReplayKey {
3940            root,
3941            harness: "mcp".to_string(),
3942            session: "session-1".to_string(),
3943        };
3944        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
3945        let mut push_buffer = HashMap::new();
3946        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
3947
3948        let replayed = replay_buffered_push_frames(
3949            &writer_tx,
3950            route_key(3),
3951            &mut push_buffer,
3952            &key,
3953            BindTrust::Untrusted,
3954        );
3955
3956        assert_eq!(replayed, 0);
3957        assert!(!push_buffer.contains_key(&key));
3958        assert!(writer_rx.try_recv().is_err());
3959    }
3960
3961    #[test]
3962    fn coalesce_push_batch_collapses_lossy_and_preserves_reliable_fifo() {
3963        let (_root_dir, root) = test_root("subc-coalesce-root");
3964        let (_other_dir, other_root) = test_root("subc-coalesce-other");
3965
3966        let output = coalesce_push_batch(vec![
3967            (root.clone(), status_frame(1)),
3968            (root.clone(), completion_frame("task-1")),
3969            (root.clone(), status_frame(2)),
3970            (root.clone(), completion_frame("task-2")),
3971            (root.clone(), long_running_frame("long-task", 100)),
3972            (root.clone(), long_running_frame("long-task", 200)),
3973            (other_root.clone(), status_frame(9)),
3974        ]);
3975
3976        let completion_tasks: Vec<_> = output
3977            .iter()
3978            .filter_map(|(_, frame)| completion_task(frame))
3979            .collect();
3980        assert_eq!(completion_tasks, vec!["task-1", "task-2"]);
3981
3982        let root_statuses: Vec<_> = output
3983            .iter()
3984            .filter(|(output_root, _)| output_root == &root)
3985            .filter_map(|(_, frame)| status_seq(frame))
3986            .collect();
3987        assert_eq!(root_statuses, vec![2]);
3988
3989        let other_statuses: Vec<_> = output
3990            .iter()
3991            .filter(|(output_root, _)| output_root == &other_root)
3992            .filter_map(|(_, frame)| status_seq(frame))
3993            .collect();
3994        assert_eq!(other_statuses, vec![9]);
3995
3996        let long_running_elapsed: Vec<_> = output
3997            .iter()
3998            .filter_map(|(_, frame)| match frame {
3999                PushFrame::BashLongRunning(long_running) => Some(long_running.elapsed_ms),
4000                _ => None,
4001            })
4002            .collect();
4003        assert_eq!(long_running_elapsed, vec![200]);
4004    }
4005
4006    #[test]
4007    fn coalesce_push_batch_keeps_progress_stream_keys_separate() {
4008        let (_root_dir, root) = test_root("subc-progress-coalesce-root");
4009
4010        let output = coalesce_push_batch(vec![
4011            (
4012                root.clone(),
4013                progress_frame("request-1", ProgressKind::Stdout, "old stdout"),
4014            ),
4015            (
4016                root.clone(),
4017                progress_frame("request-1", ProgressKind::Stderr, "stderr"),
4018            ),
4019            (
4020                root.clone(),
4021                progress_frame("request-2", ProgressKind::Stdout, "other stdout"),
4022            ),
4023            (
4024                root.clone(),
4025                progress_frame("request-1", ProgressKind::Stdout, "new stdout"),
4026            ),
4027        ]);
4028
4029        let progress: Vec<_> = output
4030            .iter()
4031            .filter_map(|(_, frame)| match frame {
4032                PushFrame::Progress(progress) => Some((
4033                    progress.request_id.as_str(),
4034                    match progress.kind {
4035                        ProgressKind::Stdout => "stdout",
4036                        ProgressKind::Stderr => "stderr",
4037                    },
4038                    progress.chunk.as_str(),
4039                )),
4040                _ => None,
4041            })
4042            .collect();
4043
4044        assert_eq!(
4045            progress,
4046            vec![
4047                ("request-1", "stderr", "stderr"),
4048                ("request-2", "stdout", "other stdout"),
4049                ("request-1", "stdout", "new stdout"),
4050            ]
4051        );
4052    }
4053
4054    #[test]
4055    fn progress_sender_keeps_reliable_off_saturated_lossy_funnel_without_blocking() {
4056        let (_root_dir, root) = test_root("subc-push-full-root");
4057        let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1);
4058        let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
4059        let sender = progress_sender_for_root(
4060            PushSenders {
4061                lossy_tx,
4062                reliable_tx,
4063            },
4064            root.clone(),
4065        );
4066
4067        let started = Instant::now();
4068        sender(status_frame(1));
4069        sender(status_frame(2));
4070        sender(completion_frame("reliable-after-lossy-full"));
4071        assert!(
4072            started.elapsed() < Duration::from_millis(50),
4073            "saturated push sender must return immediately"
4074        );
4075
4076        let (received_root, received_frame) =
4077            lossy_rx.try_recv().expect("first lossy frame queued");
4078        assert_eq!(received_root, root);
4079        assert_eq!(status_seq(&received_frame), Some(1));
4080        assert!(
4081            lossy_rx.try_recv().is_err(),
4082            "second lossy frame should be dropped"
4083        );
4084
4085        let (reliable_root, reliable_frame) = reliable_rx
4086            .try_recv()
4087            .expect("reliable frame bypasses lossy backpressure");
4088        assert_eq!(reliable_root, root);
4089        assert_eq!(
4090            completion_task(&reliable_frame),
4091            Some("reliable-after-lossy-full")
4092        );
4093        assert!(reliable_rx.try_recv().is_err());
4094    }
4095
4096    #[test]
4097    fn fan_out_lossy_push_frame_drops_when_writer_is_full_without_blocking() {
4098        let (_root_dir, root) = test_root("subc-writer-full-root");
4099        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
4100        writer_tx
4101            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4102            .expect("prefill writer queue");
4103
4104        let mut root_channels = HashMap::new();
4105        root_channels.insert(root.clone(), HashSet::from([route_key(7)]));
4106
4107        let routes = HashMap::new();
4108        let started = Instant::now();
4109        let result =
4110            fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(1));
4111        assert!(
4112            started.elapsed() < Duration::from_millis(50),
4113            "saturated writer fan-out must return immediately"
4114        );
4115        assert_eq!(
4116            result,
4117            FanOutResult {
4118                matched_channels: 1,
4119                sent_frames: 0,
4120            }
4121        );
4122
4123        let queued = writer_rx
4124            .try_recv()
4125            .expect("prefilled frame remains queued");
4126        assert_eq!(queued.header.ty, FrameType::Ping);
4127        assert!(
4128            writer_rx.try_recv().is_err(),
4129            "push should be dropped on full writer"
4130        );
4131    }
4132
4133    #[test]
4134    fn reliable_push_backpressure_buffers_and_retries_on_tick() {
4135        let (_root_dir, root) = test_root("subc-retry-buffer-root");
4136        let identity = route_identity(&root, "session-1");
4137        let key = ReplayKey::from_identity(&identity);
4138        let mut routes = HashMap::new();
4139        routes.insert(route_key(9), identity.clone());
4140        let mut root_channels = HashMap::new();
4141        root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
4142        let mut session_identity = HashMap::new();
4143        remember_session_identity(&mut session_identity, &identity);
4144        let mut retry_buffer = HashMap::new();
4145        let mut push_buffer = HashMap::new();
4146        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
4147        writer_tx
4148            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4149            .expect("prefill writer queue");
4150
4151        let result = fan_out_reliable_push_frame(
4152            &writer_tx,
4153            &routes,
4154            &root_channels,
4155            &session_identity,
4156            &mut retry_buffer,
4157            &mut push_buffer,
4158            &root,
4159            &completion_frame("retry-task"),
4160        );
4161
4162        assert_eq!(
4163            result,
4164            FanOutResult {
4165                matched_channels: 1,
4166                sent_frames: 0,
4167            }
4168        );
4169        assert!(push_buffer.is_empty());
4170        assert_eq!(retry_buffer.get(&route_key(9)).map(VecDeque::len), Some(1));
4171        assert_eq!(&retry_buffer[&route_key(9)][0].0, &key);
4172
4173        let queued = writer_rx.try_recv().expect("prefilled frame");
4174        assert_eq!(queued.header.ty, FrameType::Ping);
4175        assert_eq!(
4176            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
4177            1
4178        );
4179        let retried = writer_rx.try_recv().expect("retried reliable push");
4180        assert_eq!(retried.header.ty, FrameType::Push);
4181        assert_eq!(retried.header.channel, 9);
4182        assert_eq!(push_frame_task_id(&retried).as_deref(), Some("retry-task"));
4183        assert!(!retry_buffer.contains_key(&route_key(9)));
4184    }
4185
4186    #[test]
4187    fn reliable_push_fifo_gates_new_frames_behind_retry_buffer() {
4188        let (_root_dir, root) = test_root("subc-retry-fifo-root");
4189        let identity = route_identity(&root, "session-1");
4190        let mut routes = HashMap::new();
4191        routes.insert(route_key(9), identity.clone());
4192        let mut root_channels = HashMap::new();
4193        root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
4194        let mut session_identity = HashMap::new();
4195        remember_session_identity(&mut session_identity, &identity);
4196        let mut retry_buffer = HashMap::new();
4197        let mut push_buffer = HashMap::new();
4198        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
4199        writer_tx
4200            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4201            .expect("prefill writer queue");
4202
4203        let first = completion_frame("fifo-1");
4204        let second = completion_frame("fifo-2");
4205        let _ = fan_out_reliable_push_frame(
4206            &writer_tx,
4207            &routes,
4208            &root_channels,
4209            &session_identity,
4210            &mut retry_buffer,
4211            &mut push_buffer,
4212            &root,
4213            &first,
4214        );
4215        let queued = writer_rx.try_recv().expect("free writer capacity");
4216        assert_eq!(queued.header.ty, FrameType::Ping);
4217
4218        let _ = fan_out_reliable_push_frame(
4219            &writer_tx,
4220            &routes,
4221            &root_channels,
4222            &session_identity,
4223            &mut retry_buffer,
4224            &mut push_buffer,
4225            &root,
4226            &second,
4227        );
4228        assert!(
4229            writer_rx.try_recv().is_err(),
4230            "second reliable frame must not bypass pending retry frame"
4231        );
4232        let queued_tasks: Vec<_> = retry_buffer[&route_key(9)]
4233            .iter()
4234            .filter_map(|(_, frame)| completion_task(frame))
4235            .collect();
4236        assert_eq!(queued_tasks, vec!["fifo-1", "fifo-2"]);
4237
4238        assert_eq!(
4239            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
4240            1
4241        );
4242        let first_sent = writer_rx.try_recv().expect("first reliable push");
4243        assert_eq!(push_frame_task_id(&first_sent).as_deref(), Some("fifo-1"));
4244        assert_eq!(
4245            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
4246            1
4247        );
4248        let second_sent = writer_rx.try_recv().expect("second reliable push");
4249        assert_eq!(push_frame_task_id(&second_sent).as_deref(), Some("fifo-2"));
4250        assert!(!retry_buffer.contains_key(&route_key(9)));
4251    }
4252
4253    #[test]
4254    fn replay_buffered_push_frames_drains_incrementally_on_backpressure() {
4255        let (_root_dir, root) = test_root("subc-incremental-replay-root");
4256        let key = ReplayKey {
4257            root,
4258            harness: "opencode".to_string(),
4259            session: "session-1".to_string(),
4260        };
4261        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(2);
4262        writer_tx
4263            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4264            .expect("prefill writer queue");
4265        let mut push_buffer = HashMap::new();
4266        for task in ["replay-1", "replay-2", "replay-3"] {
4267            buffer_push_frame(&mut push_buffer, key.clone(), completion_frame(task));
4268        }
4269
4270        assert_eq!(
4271            replay_buffered_push_frames(
4272                &writer_tx,
4273                route_key(4),
4274                &mut push_buffer,
4275                &key,
4276                BindTrust::FirstParty
4277            ),
4278            1
4279        );
4280        assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(2));
4281        let remaining: Vec<_> = push_buffer[&key]
4282            .iter()
4283            .filter_map(completion_task)
4284            .collect();
4285        assert_eq!(remaining, vec!["replay-2", "replay-3"]);
4286
4287        let queued = writer_rx.try_recv().expect("prefilled frame");
4288        assert_eq!(queued.header.ty, FrameType::Ping);
4289        let first = writer_rx.try_recv().expect("first replayed push");
4290        assert_eq!(push_frame_task_id(&first).as_deref(), Some("replay-1"));
4291
4292        assert_eq!(
4293            replay_buffered_push_frames(
4294                &writer_tx,
4295                route_key(4),
4296                &mut push_buffer,
4297                &key,
4298                BindTrust::FirstParty
4299            ),
4300            2
4301        );
4302        let second = writer_rx.try_recv().expect("second replayed push");
4303        let third = writer_rx.try_recv().expect("third replayed push");
4304        assert_eq!(push_frame_task_id(&second).as_deref(), Some("replay-2"));
4305        assert_eq!(push_frame_task_id(&third).as_deref(), Some("replay-3"));
4306        assert!(!push_buffer.contains_key(&key));
4307    }
4308
4309    #[test]
4310    fn goodbye_migrates_retry_buffer_into_detach_replay() {
4311        let (_root_dir, root) = test_root("subc-goodbye-migration-root");
4312        let key = ReplayKey {
4313            root,
4314            harness: "opencode".to_string(),
4315            session: "session-1".to_string(),
4316        };
4317        let mut retry_buffer = HashMap::new();
4318        buffer_retry_frame(
4319            &mut retry_buffer,
4320            route_key(5),
4321            key.clone(),
4322            completion_frame("migrated-task"),
4323        );
4324        let mut push_buffer = HashMap::new();
4325
4326        assert_eq!(
4327            migrate_retry_buffer_to_push_buffer(&mut retry_buffer, route_key(5), &mut push_buffer),
4328            1
4329        );
4330
4331        assert!(!retry_buffer.contains_key(&route_key(5)));
4332        assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(1));
4333        assert_eq!(
4334            completion_task(&push_buffer[&key][0]),
4335            Some("migrated-task")
4336        );
4337    }
4338
4339    #[test]
4340    fn permanent_push_send_failure_is_dropped_not_retried_forever() {
4341        let (_root_dir, root) = test_root("subc-permanent-failure-root");
4342        let key = ReplayKey {
4343            root,
4344            harness: "opencode".to_string(),
4345            session: "session-1".to_string(),
4346        };
4347        let (writer_tx, writer_rx) = mpsc::channel::<Frame>(1);
4348        drop(writer_rx);
4349
4350        let mut push_buffer = HashMap::new();
4351        buffer_push_frame(
4352            &mut push_buffer,
4353            key.clone(),
4354            completion_frame("closed-replay"),
4355        );
4356        assert_eq!(
4357            replay_buffered_push_frames(
4358                &writer_tx,
4359                route_key(4),
4360                &mut push_buffer,
4361                &key,
4362                BindTrust::FirstParty
4363            ),
4364            0
4365        );
4366        assert!(!push_buffer.contains_key(&key));
4367
4368        let mut retry_buffer = HashMap::new();
4369        buffer_retry_frame(
4370            &mut retry_buffer,
4371            route_key(4),
4372            key,
4373            completion_frame("closed-retry"),
4374        );
4375        assert_eq!(
4376            drain_retry_buffer_for_channel(&writer_tx, route_key(4), &mut retry_buffer),
4377            0
4378        );
4379        assert!(!retry_buffer.contains_key(&route_key(4)));
4380    }
4381
4382    #[test]
4383    fn completed_task_suppresses_stale_long_running_lossy_push() {
4384        let mut completed_tasks = CompletedTaskIds::default();
4385        assert!(!should_drop_lossy_push(
4386            &completed_tasks,
4387            &long_running_frame("stale-task", 100)
4388        ));
4389
4390        completed_tasks.remember("stale-task");
4391
4392        assert!(should_drop_lossy_push(
4393            &completed_tasks,
4394            &long_running_frame("stale-task", 200)
4395        ));
4396        assert!(!should_drop_lossy_push(
4397            &completed_tasks,
4398            &long_running_frame("other-task", 200)
4399        ));
4400    }
4401
4402    #[test]
4403    fn arm_bg_wake_bumps_epoch_even_when_channel_is_already_pending() {
4404        let (_root_dir, root) = test_root("subc-bg-wake-epoch-root");
4405        let session = "session-1".to_string();
4406        let key = (root.clone(), session.clone());
4407        let channel = route_key(7);
4408        let mut bg_wake_pending = HashSet::from([channel]);
4409        let mut bg_wake_epoch = HashMap::from([(key.clone(), 41_u64)]);
4410
4411        arm_bg_wake(
4412            root,
4413            session,
4414            channel,
4415            &mut bg_wake_pending,
4416            &mut bg_wake_epoch,
4417        );
4418
4419        assert_eq!(bg_wake_pending, HashSet::from([channel]));
4420        assert_eq!(bg_wake_epoch.get(&key).copied(), Some(42));
4421    }
4422
4423    #[test]
4424    fn stale_maintenance_epoch_does_not_clear_newer_bg_wake() {
4425        let (_root_dir, root) = test_root("subc-bg-wake-stale-root");
4426        let session = "session-1".to_string();
4427        let key = (root.clone(), session.clone());
4428        let channel = route_key(8);
4429        let mut bg_sub_by_session = HashMap::new();
4430        bg_sub_by_session.insert(key.clone(), channel);
4431        let mut bg_wake_pending = HashSet::new();
4432        let mut bg_wake_epoch = HashMap::new();
4433
4434        arm_bg_wake(
4435            root.clone(),
4436            session.clone(),
4437            channel,
4438            &mut bg_wake_pending,
4439            &mut bg_wake_epoch,
4440        );
4441        let epoch_at_submit = bg_wake_epoch[&key];
4442        arm_bg_wake(
4443            root.clone(),
4444            session.clone(),
4445            channel,
4446            &mut bg_wake_pending,
4447            &mut bg_wake_epoch,
4448        );
4449
4450        clear_stale_bg_wakes_for_empty_sessions(
4451            &root,
4452            &[(session, epoch_at_submit)],
4453            &bg_sub_by_session,
4454            &mut bg_wake_pending,
4455            &bg_wake_epoch,
4456        );
4457
4458        assert!(bg_wake_pending.contains(&channel));
4459        assert_eq!(bg_wake_epoch.get(&key).copied(), Some(epoch_at_submit + 1));
4460    }
4461
4462    #[test]
4463    fn matching_maintenance_epoch_clears_genuinely_stale_bg_wake() {
4464        let (_root_dir, root) = test_root("subc-bg-wake-clear-root");
4465        let session = "session-1".to_string();
4466        let key = (root.clone(), session.clone());
4467        let channel = route_key(9);
4468        let mut bg_sub_by_session = HashMap::new();
4469        bg_sub_by_session.insert(key.clone(), channel);
4470        let mut bg_wake_pending = HashSet::new();
4471        let mut bg_wake_epoch = HashMap::new();
4472
4473        arm_bg_wake(
4474            root.clone(),
4475            session.clone(),
4476            channel,
4477            &mut bg_wake_pending,
4478            &mut bg_wake_epoch,
4479        );
4480        let epoch_at_submit = bg_wake_epoch[&key];
4481
4482        clear_stale_bg_wakes_for_empty_sessions(
4483            &root,
4484            &[(session, epoch_at_submit)],
4485            &bg_sub_by_session,
4486            &mut bg_wake_pending,
4487            &bg_wake_epoch,
4488        );
4489
4490        assert!(!bg_wake_pending.contains(&channel));
4491    }
4492
4493    #[test]
4494    fn response_is_fatal_panic_only_matches_panic_exclusive_code() {
4495        let tool_error = Response::error("request-1", "internal_error", "ordinary tool error");
4496        let panic_error = Response::error("request-2", "actor_fatal", "mutating panic");
4497
4498        assert!(!response_is_fatal_panic(&tool_error));
4499        assert!(response_is_fatal_panic(&panic_error));
4500    }
4501
4502    #[tokio::test]
4503    async fn persistent_cancel_resolves_when_fired_before_await() {
4504        // The lost-wakeup guard: cancel() fires exactly once via notify_waiters()
4505        // (no stored permit). A waiter that registers AFTER the cancel must still
4506        // observe it via the flag; a waiter racing the cancel must still be woken.
4507        let signal = PersistentCancelSignal::new();
4508        signal.cancel();
4509        // Fired before we ever call cancelled() — must return immediately, not park.
4510        tokio::time::timeout(Duration::from_secs(1), signal.cancelled())
4511            .await
4512            .expect("cancelled() must resolve when cancel fired beforehand");
4513
4514        // A fresh signal cancelled concurrently with an in-flight cancelled().
4515        let racing = PersistentCancelSignal::new();
4516        let racing_for_task = racing.clone();
4517        let waiter = tokio::spawn(async move { racing_for_task.cancelled().await });
4518        racing.cancel();
4519        tokio::time::timeout(Duration::from_secs(1), waiter)
4520            .await
4521            .expect("cancelled() must resolve when cancel races the await")
4522            .expect("waiter task panicked");
4523    }
4524
4525    #[tokio::test]
4526    async fn control_send_times_out_when_writer_queue_remains_full() {
4527        let (writer_tx, _writer_rx) = mpsc::channel::<Frame>(1);
4528        writer_tx
4529            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
4530            .expect("prefill writer queue");
4531        let started = Instant::now();
4532
4533        let result = send_frame(
4534            &writer_tx,
4535            Frame::build(FrameType::Pong, control_flags(), 0, 2, Vec::new()).unwrap(),
4536        )
4537        .await;
4538
4539        assert!(matches!(result, Err(SubcError::WriterBackpressureTimeout)));
4540        assert!(
4541            started.elapsed() < Duration::from_secs(2),
4542            "control send guard should be bounded"
4543        );
4544    }
4545
4546    const CORE_TOOLS: [&str; 21] = [
4547        "status",
4548        "bash",
4549        "read",
4550        "write",
4551        "edit",
4552        "apply_patch",
4553        "grep",
4554        "glob",
4555        "search",
4556        "outline",
4557        "zoom",
4558        "inspect",
4559        "callgraph",
4560        "conflicts",
4561        "ast_search",
4562        "ast_replace",
4563        "delete",
4564        "move",
4565        "import",
4566        "refactor",
4567        "safety",
4568    ];
4569
4570    fn is_bare_placeholder_schema(schema: &Value) -> bool {
4571        schema == &json!({ "type": "object" })
4572    }
4573
4574    #[test]
4575    fn build_manifest_serves_embedded_tool_schemas() {
4576        let manifest = build_manifest();
4577        let tools = match manifest.provides.first() {
4578            Some(ProviderRole::ToolProvider { tools, .. }) => tools,
4579            _ => panic!("expected ToolProvider"),
4580        };
4581        let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
4582        for name in CORE_TOOLS {
4583            let tool = by_name
4584                .get(name)
4585                .unwrap_or_else(|| panic!("missing tool {name}"));
4586            assert!(
4587                !is_bare_placeholder_schema(&tool.schema),
4588                "{name} must not use bare placeholder schema"
4589            );
4590            assert_eq!(
4591                tool.schema.get("type").and_then(|v| v.as_str()),
4592                Some("object"),
4593                "{name} schema must be an object"
4594            );
4595        }
4596
4597        let read = by_name["read"]
4598            .schema
4599            .get("properties")
4600            .and_then(|p| p.as_object());
4601        let read_props = read.expect("read schema properties");
4602        assert!(
4603            read_props.contains_key("filePath"),
4604            "read schema must expose filePath"
4605        );
4606
4607        let status = &by_name["status"].schema;
4608        assert_eq!(
4609            status.get("properties").and_then(|v| v.as_object()),
4610            Some(&serde_json::Map::new()),
4611            "status schema must have empty properties"
4612        );
4613        assert_eq!(
4614            status.get("additionalProperties").and_then(|v| v.as_bool()),
4615            Some(false),
4616            "status schema must forbid additionalProperties"
4617        );
4618    }
4619
4620    #[test]
4621    fn build_manifest_classifies_execution_mode_by_observable_effect() {
4622        let manifest = build_manifest();
4623        let tools = match manifest.provides.first() {
4624            Some(ProviderRole::ToolProvider { tools, .. }) => tools,
4625            _ => panic!("expected ToolProvider"),
4626        };
4627        let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
4628
4629        // Readers warm AFT's own index/cache/symbol artifacts (internal ctx
4630        // mutation), not the user's observable workspace, so they are Pure.
4631        for name in [
4632            "status",
4633            "read",
4634            "grep",
4635            "glob",
4636            "search",
4637            "outline",
4638            "zoom",
4639            "inspect",
4640            "callgraph",
4641            "conflicts",
4642            "ast_search",
4643        ] {
4644            assert_eq!(
4645                by_name[name].execution_mode,
4646                ExecutionMode::Pure,
4647                "{name} produces no observable side effect and must be Pure"
4648            );
4649        }
4650        // Mutating tools can write files, change safety state, or spawn processes.
4651        for name in [
4652            "bash",
4653            "write",
4654            "edit",
4655            "apply_patch",
4656            "ast_replace",
4657            "delete",
4658            "move",
4659            "import",
4660            "refactor",
4661            "safety",
4662        ] {
4663            assert_eq!(
4664                by_name[name].execution_mode,
4665                ExecutionMode::Mutating,
4666                "{name} writes files and must be Mutating"
4667            );
4668        }
4669    }
4670
4671    #[test]
4672    fn subc_agent_lanes_classify_new_read_tools() {
4673        assert_eq!(command_lane("callgraph"), Lane::HeavyInit);
4674        assert_eq!(command_lane("conflicts"), Lane::PureRead);
4675    }
4676
4677    #[test]
4678    fn native_plumbing_allowlist_admits_exactly_drain_and_ack() {
4679        // BC2: the route gate admits a name when it's an agent core tool OR a
4680        // native plumbing command. These two carry no agent surface and no
4681        // config/trust surface, so they're admitted to dispatch over a bound
4682        // route while everything else (notably `configure`) stays fail-closed.
4683        assert!(is_subc_native_plumbing_tool("bash_drain_completions"));
4684        assert!(is_subc_native_plumbing_tool("bash_ack_completions"));
4685
4686        // The allowlist is TIGHT — it must not admit the config-bypass vector
4687        // the fail-closed gate exists to block, nor any other native command.
4688        assert!(!is_subc_native_plumbing_tool("configure"));
4689        assert!(!is_subc_native_plumbing_tool("bash"));
4690        assert!(!is_subc_native_plumbing_tool("bash_kill"));
4691        assert!(!is_subc_native_plumbing_tool("db_set_state"));
4692        assert!(!is_subc_native_plumbing_tool("undo"));
4693
4694        // The plumbing commands are NOT agent-facing tools — they must stay out
4695        // of the manifest gate so they never reach the model surface.
4696        assert!(!is_subc_agent_core_tool("bash_drain_completions"));
4697        assert!(!is_subc_agent_core_tool("bash_ack_completions"));
4698
4699        // Lanes are already assigned (pre-existing): drain reads, ack mutates.
4700        assert_eq!(command_lane("bash_drain_completions"), Lane::PureRead);
4701        assert_eq!(command_lane("bash_ack_completions"), Lane::Mutating);
4702    }
4703
4704    #[test]
4705    fn tool_response_frame_carries_flat_standalone_shape_in_structured_content() {
4706        use crate::protocol::Response;
4707
4708        // A response with sidecars the FIRST-PARTY plugin drives UI from
4709        // (status_bar, bg_completions, code) plus a normal result field.
4710        let response = Response::success(
4711            "req-7",
4712            json!({
4713                "complete": true,
4714                "matches": 3,
4715                "status_bar": { "errors": 0, "warnings": 1 },
4716                "bg_completions": [{ "task_id": "bash-abc" }],
4717            }),
4718        );
4719        let result = ToolCallResult {
4720            text: "rendered text".to_string(),
4721            response,
4722        };
4723
4724        // The flat shape must equal the standalone NDJSON `tool_call` body:
4725        // {id, success, ...data, text}. Build the standalone expectation the
4726        // same way commands::tool_call::response_with_text does.
4727        let expected_flat = json!({
4728            "id": "req-7",
4729            "success": true,
4730            "complete": true,
4731            "matches": 3,
4732            "status_bar": { "errors": 0, "warnings": 1 },
4733            "bg_completions": [{ "task_id": "bash-abc" }],
4734            "text": "rendered text",
4735        });
4736        assert_eq!(
4737            flat_tool_response(&result.response, &result.text),
4738            expected_flat,
4739            "structuredContent must be byte-identical to the standalone flat response"
4740        );
4741
4742        // The frame body carries the MCP surface for generic hosts AND the flat
4743        // sidecar shape under structuredContent for the first-party plugin.
4744        let frame =
4745            build_tool_response_frame(PROTOCOL_VERSION, 1, 42, control_flags(), &result).unwrap();
4746        let body: Value = serde_json::from_slice(&frame.body).unwrap();
4747        assert_eq!(body["isError"], json!(false));
4748        assert_eq!(body["content"][0]["type"], json!("text"));
4749        assert_eq!(body["content"][0]["text"], json!("rendered text"));
4750        assert_eq!(body["structuredContent"], expected_flat);
4751
4752        // A failed response flips isError and still carries the flat shape
4753        // (with success:false + code) for the plugin's error path.
4754        let err = Response::error_with_data(
4755            "req-8",
4756            "ambiguous_match",
4757            "too many matches",
4758            json!({ "candidates": ["a", "b"] }),
4759        );
4760        let err_result = ToolCallResult {
4761            text: "error text".to_string(),
4762            response: err,
4763        };
4764        let err_frame =
4765            build_tool_response_frame(PROTOCOL_VERSION, 1, 43, control_flags(), &err_result)
4766                .unwrap();
4767        let err_body: Value = serde_json::from_slice(&err_frame.body).unwrap();
4768        assert_eq!(err_body["isError"], json!(true));
4769        assert_eq!(err_body["structuredContent"]["success"], json!(false));
4770        assert_eq!(
4771            err_body["structuredContent"]["code"],
4772            json!("ambiguous_match")
4773        );
4774        assert_eq!(
4775            err_body["structuredContent"]["candidates"],
4776            json!(["a", "b"])
4777        );
4778        assert_eq!(err_body["structuredContent"]["text"], json!("error text"));
4779    }
4780}
4781
4782#[derive(Debug)]
4783pub enum SubcError {
4784    Runtime(std::io::Error),
4785    ConnectionFile {
4786        path: PathBuf,
4787        source: subc_transport::ConnectionFileError,
4788    },
4789    NoEndpoint {
4790        path: PathBuf,
4791    },
4792    InvalidEndpoint {
4793        path: PathBuf,
4794        endpoint: String,
4795    },
4796    Connect {
4797        endpoint: String,
4798        source: std::io::Error,
4799    },
4800    Auth {
4801        endpoint: String,
4802        source: subc_transport::AuthError,
4803    },
4804    FrameIo(subc_transport::FrameIoError),
4805    FrameBuild(subc_protocol::FrameBuildError),
4806    WriterClosed,
4807    WriterBackpressureTimeout,
4808    WriterJoin(tokio::task::JoinError),
4809    Json(serde_json::Error),
4810    ClosedBeforeHelloAck,
4811    HelloRejected {
4812        body: Option<ErrorBody>,
4813    },
4814    UnexpectedFrame {
4815        ty: FrameType,
4816    },
4817}
4818
4819impl fmt::Display for SubcError {
4820    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4821        match self {
4822            Self::Runtime(e) => write!(f, "failed to build subc tokio runtime: {e}"),
4823            Self::ConnectionFile { path, source } => {
4824                write!(f, "failed to read subc connection file {path:?}: {source}")
4825            }
4826            Self::NoEndpoint { path } => {
4827                write!(f, "subc connection file {path:?} has no endpoints")
4828            }
4829            Self::InvalidEndpoint { path, endpoint } => {
4830                write!(
4831                    f,
4832                    "subc connection file {path:?} has invalid endpoint {endpoint}"
4833                )
4834            }
4835            Self::Connect { endpoint, source } => {
4836                write!(f, "failed to connect to subc endpoint {endpoint}: {source}")
4837            }
4838            Self::Auth { endpoint, source } => {
4839                write!(
4840                    f,
4841                    "failed to authenticate to subc endpoint {endpoint}: {source}"
4842                )
4843            }
4844            Self::FrameIo(e) => write!(f, "subc frame I/O error: {e}"),
4845            Self::FrameBuild(e) => write!(f, "subc frame build error: {e}"),
4846            Self::WriterClosed => write!(f, "subc writer task closed"),
4847            Self::WriterBackpressureTimeout => write!(
4848                f,
4849                "subc writer task stayed backpressured while sending a control frame"
4850            ),
4851            Self::WriterJoin(e) => write!(f, "subc writer task join error: {e}"),
4852            Self::Json(e) => write!(f, "subc JSON error: {e}"),
4853            Self::ClosedBeforeHelloAck => {
4854                write!(f, "subc daemon closed the connection before HelloAck")
4855            }
4856            Self::HelloRejected { body } => match body {
4857                Some(b) => write!(f, "subc rejected ModuleHello: {} ({})", b.code, b.message),
4858                None => write!(f, "subc rejected ModuleHello (unparseable error body)"),
4859            },
4860            Self::UnexpectedFrame { ty } => {
4861                write!(f, "subc sent unexpected frame in place of HelloAck: {ty:?}")
4862            }
4863        }
4864    }
4865}
4866
4867impl std::error::Error for SubcError {}