Skip to main content

aft/
subc.rs

1//! subc daemon attach — transport edge.
2//!
3//! When AFT is launched as `aft --subc <connection-file>`, it does NOT run the
4//! standalone NDJSON-over-stdin loop. Instead it connects to a running subc
5//! daemon over loopback TCP, authenticates with the pre-envelope HMAC handshake
6//! (`subc-transport`), then speaks the subc frame protocol (`subc-protocol`):
7//! ModuleHello → HelloAck (register as a tool provider), then a channel-0
8//! control loop (Ping/Pong, RouteBind) plus route-channel tool calls.
9//!
10//! Concurrency: subc routes tool calls through the executor. The tokio
11//! edge never dispatches against `AppContext` inline; per-actor executor lanes
12//! own the reader/mutator epoch, while a writer task serializes outbound frames.
13
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::fmt;
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use serde::Deserialize;
22use serde_json::{json, Value};
23
24use crate::config::Config;
25use crate::config_resolve::ConfigTier;
26use crate::context::{App, AppContext, ProgressSender};
27use crate::executor::{Executor, Lane};
28use crate::log_ctx;
29use crate::path_identity::ProjectRootId;
30use crate::protocol::{ProgressKind, PushFrame, RawRequest, Response};
31use crate::runtime_drain;
32
33use subc_protocol::manifest::{
34    Bindings, Concurrency, ConfigBinding, ConfigSource, ExecutionMode, IdentityBinding,
35    IdentityScope, ModuleManifest, ProviderRole, StorageBinding, StorageKind, StorageScope, Tool,
36    TrustTier,
37};
38use subc_protocol::session::{ModuleControlRequest, ModuleControlResponse};
39use subc_protocol::{
40    ErrorBody, Flags, Frame, FrameType, ModuleHelloBody, Priority, PROTOCOL_VERSION,
41};
42use subc_transport::{authenticate_client, connection_file, read_frame, write_frame};
43use tokio::io::{AsyncRead, AsyncWrite};
44use tokio::net::TcpStream;
45use tokio::sync::{mpsc, oneshot, Notify};
46use tokio::task::JoinHandle;
47
48/// Handshake budget. subc binds-before-spawn, so a reachable daemon authenticates
49/// well within this; an unreachable/socket-stale daemon fails loud rather than
50/// silently downgrading to standalone (the --subc contract).
51const AUTH_DEADLINE: Duration = Duration::from_secs(5);
52
53/// Correlation id for the initial ModuleHello (channel 0).
54const HELLO_CORR: u64 = 1;
55
56/// Per-session in-memory replay cap for must-deliver Push frames. This covers
57/// detach/re-attach while AFT stays alive; cross-restart replay is phased later.
58const PUSH_BUFFER_MAX_PER_KEY: usize = 256;
59
60/// Bounded guard for control-frame sends. If the daemon stops reading and the
61/// writer queue stays full, tear the subc edge down instead of stalling the
62/// route loop indefinitely.
63const CONTROL_SEND_TIMEOUT: Duration = Duration::from_millis(250);
64
65/// Small bounded memory of completed task ids used to suppress stale lossy
66/// long-running reminders that arrive after their reliable completion event.
67const COMPLETED_TASK_SUPPRESSION_MAX: usize = 4096;
68
69type RouteChannel = u32;
70type PushEnvelope = (ProjectRootId, PushFrame);
71type RetryBuffer = HashMap<RouteChannel, VecDeque<(ReplayKey, PushFrame)>>;
72
73#[derive(Clone)]
74struct PushSenders {
75    lossy_tx: mpsc::Sender<PushEnvelope>,
76    reliable_tx: mpsc::UnboundedSender<PushEnvelope>,
77}
78
79#[derive(Debug)]
80struct RootMeta {
81    maintenance_pending: bool,
82    last_touched: Instant,
83    diagnostics_on_edit: bool,
84}
85
86#[derive(Debug)]
87struct PendingBind {
88    bind_root_id: ProjectRootId,
89    inserted_new_actor: bool,
90    cancelled: bool,
91}
92
93struct RouteBindCompletion {
94    route_channel: u16,
95    identity: RouteIdentity,
96    bind_root_id: ProjectRootId,
97    inserted_new_actor: bool,
98    configure_response: Response,
99    drain_response: Option<Response>,
100    diagnostics_on_edit: bool,
101    ver: u8,
102    corr: u64,
103    flags: Flags,
104}
105
106#[derive(Debug, Clone)]
107struct RouteIdentity {
108    root: ProjectRootId,
109    project_root: PathBuf,
110    harness: String,
111    session: String,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Hash)]
115struct ReplayKey {
116    root: ProjectRootId,
117    harness: String,
118    session: String,
119}
120
121impl ReplayKey {
122    fn from_identity(identity: &RouteIdentity) -> Self {
123        Self {
124            root: identity.root.clone(),
125            harness: identity.harness.clone(),
126            session: identity.session.clone(),
127        }
128    }
129}
130
131#[derive(Debug, Default)]
132struct CompletedTaskIds {
133    order: VecDeque<String>,
134    set: HashSet<String>,
135}
136
137impl CompletedTaskIds {
138    fn remember(&mut self, task_id: &str) {
139        if self.set.contains(task_id) {
140            return;
141        }
142        if self.order.len() >= COMPLETED_TASK_SUPPRESSION_MAX {
143            if let Some(evicted) = self.order.pop_front() {
144                self.set.remove(&evicted);
145            }
146        }
147        let task_id = task_id.to_string();
148        self.order.push_back(task_id.clone());
149        self.set.insert(task_id);
150    }
151
152    fn contains(&self, task_id: &str) -> bool {
153        self.set.contains(task_id)
154    }
155}
156
157impl RootMeta {
158    fn new(now: Instant) -> Self {
159        Self {
160            maintenance_pending: false,
161            last_touched: now,
162            diagnostics_on_edit: false,
163        }
164    }
165
166    fn touch(&mut self) {
167        self.last_touched = Instant::now();
168    }
169}
170
171fn route_key(channel: u16) -> RouteChannel {
172    RouteChannel::from(channel)
173}
174
175fn remove_root_channel(
176    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
177    root: &ProjectRootId,
178    channel: RouteChannel,
179) {
180    let remove_root = if let Some(channels) = root_channels.get_mut(root) {
181        channels.remove(&channel);
182        channels.is_empty()
183    } else {
184        false
185    };
186    if remove_root {
187        root_channels.remove(root);
188    }
189}
190
191fn remove_route_channel(
192    routes: &mut HashMap<RouteChannel, RouteIdentity>,
193    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
194    channel: RouteChannel,
195) -> Option<RouteIdentity> {
196    let removed = routes.remove(&channel);
197    if let Some(identity) = &removed {
198        remove_root_channel(root_channels, &identity.root, channel);
199    }
200    removed
201}
202
203fn insert_route_channel(
204    routes: &mut HashMap<RouteChannel, RouteIdentity>,
205    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
206    channel: RouteChannel,
207    identity: RouteIdentity,
208) {
209    if let Some(previous) = routes.insert(channel, identity.clone()) {
210        remove_root_channel(root_channels, &previous.root, channel);
211    }
212    root_channels
213        .entry(identity.root.clone())
214        .or_default()
215        .insert(channel);
216}
217
218fn remember_session_identity(
219    session_identity: &mut HashMap<(ProjectRootId, String), String>,
220    identity: &RouteIdentity,
221) {
222    // Retained after route Goodbye so reliable session-scoped frames emitted while
223    // the session is detached can still be keyed by the full (root,harness,session)
224    // replay triple. The idle-TTL actor reaper is responsible for pruning stale
225    // identities/buffers once an actor is evicted.
226    session_identity.insert(
227        (identity.root.clone(), identity.session.clone()),
228        identity.harness.clone(),
229    );
230}
231
232fn replay_key_for_session(
233    session_identity: &HashMap<(ProjectRootId, String), String>,
234    root: &ProjectRootId,
235    session: &str,
236) -> Option<ReplayKey> {
237    let harness = session_identity.get(&(root.clone(), session.to_string()))?;
238    Some(ReplayKey {
239        root: root.clone(),
240        harness: harness.clone(),
241        session: session.to_string(),
242    })
243}
244
245fn frame_session(frame: &PushFrame) -> Option<&str> {
246    match frame {
247        PushFrame::BashCompleted(completed) => Some(completed.session_id.as_str()),
248        PushFrame::BashLongRunning(long_running) => Some(long_running.session_id.as_str()),
249        PushFrame::BashPatternMatch(pattern_match) => Some(pattern_match.session_id.as_str()),
250        PushFrame::ConfigureWarnings(warnings) => warnings.session_id.as_deref(),
251        PushFrame::StatusChanged(status) => status.session_id.as_deref(),
252        PushFrame::Progress(_) => None,
253    }
254}
255
256fn frame_is_reliable(frame: &PushFrame) -> bool {
257    matches!(
258        frame,
259        PushFrame::BashCompleted(_)
260            | PushFrame::BashPatternMatch(_)
261            | PushFrame::ConfigureWarnings(_)
262    )
263}
264
265fn completed_task_id(frame: &PushFrame) -> Option<&str> {
266    match frame {
267        PushFrame::BashCompleted(completed) => Some(completed.task_id.as_str()),
268        _ => None,
269    }
270}
271
272fn long_running_task_id(frame: &PushFrame) -> Option<&str> {
273    match frame {
274        PushFrame::BashLongRunning(long_running) => Some(long_running.task_id.as_str()),
275        _ => None,
276    }
277}
278
279fn should_drop_lossy_push(completed_tasks: &CompletedTaskIds, frame: &PushFrame) -> bool {
280    long_running_task_id(frame).is_some_and(|task_id| completed_tasks.contains(task_id))
281}
282
283fn progress_sender_for_root(push_senders: PushSenders, root_id: ProjectRootId) -> ProgressSender {
284    Arc::new(Box::new(move |frame: PushFrame| {
285        // Emitters can run on executor workers, maintenance jobs, watcher drains,
286        // semantic refresh workers, or bg-bash watchdog threads. Never block any
287        // of them on subc routing/backpressure: reliable frames take an
288        // unbounded non-blocking lane; lossy frames stay bounded and coalesced.
289        if frame_is_reliable(&frame) {
290            let _ = push_senders.reliable_tx.send((root_id.clone(), frame));
291        } else {
292            let _ = push_senders.lossy_tx.try_send((root_id.clone(), frame));
293        }
294    }))
295}
296
297#[derive(Debug, Clone, PartialEq, Eq, Hash)]
298enum LossyProgressKind {
299    Stdout,
300    Stderr,
301}
302
303impl From<&ProgressKind> for LossyProgressKind {
304    fn from(kind: &ProgressKind) -> Self {
305        match kind {
306            ProgressKind::Stdout => Self::Stdout,
307            ProgressKind::Stderr => Self::Stderr,
308        }
309    }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313enum LossyPushKey {
314    Progress {
315        request_id: String,
316        kind: LossyProgressKind,
317    },
318    StatusChanged,
319    BashLongRunning {
320        task_id: String,
321    },
322}
323
324fn lossy_push_key(frame: &PushFrame) -> Option<LossyPushKey> {
325    match frame {
326        PushFrame::Progress(progress) => Some(LossyPushKey::Progress {
327            request_id: progress.request_id.clone(),
328            kind: LossyProgressKind::from(&progress.kind),
329        }),
330        PushFrame::StatusChanged(_) => Some(LossyPushKey::StatusChanged),
331        PushFrame::BashLongRunning(long_running) => Some(LossyPushKey::BashLongRunning {
332            task_id: long_running.task_id.clone(),
333        }),
334        PushFrame::BashCompleted(_)
335        | PushFrame::BashPatternMatch(_)
336        | PushFrame::ConfigureWarnings(_) => None,
337    }
338}
339
340fn coalesce_push_batch(batch: Vec<(ProjectRootId, PushFrame)>) -> Vec<(ProjectRootId, PushFrame)> {
341    let mut slots: Vec<Option<(ProjectRootId, PushFrame)>> = Vec::with_capacity(batch.len());
342    let mut latest_lossy: HashMap<(ProjectRootId, LossyPushKey), usize> = HashMap::new();
343
344    for (root, frame) in batch {
345        if let Some(lossy_key) = lossy_push_key(&frame) {
346            let map_key = (root.clone(), lossy_key);
347            if let Some(previous_index) = latest_lossy.insert(map_key, slots.len()) {
348                slots[previous_index] = None;
349            }
350        }
351        slots.push(Some((root, frame)));
352    }
353
354    slots.into_iter().flatten().collect()
355}
356
357#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
358struct FanOutResult {
359    /// Channels matching the frame's project/session scope. Reliable Push frames
360    /// that match a channel but hit writer backpressure are held in retry_buffer
361    /// instead of being mistaken for detach replay.
362    matched_channels: usize,
363    /// Frames accepted by the writer queue immediately. Lossy frames that are not
364    /// accepted are dropped; reliable frames are retried on transient backpressure.
365    sent_frames: usize,
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum PushSendOutcome {
370    Sent,
371    Backpressure,
372    PermanentFailure,
373}
374
375fn try_send_push_body(
376    writer_tx: &mpsc::Sender<Frame>,
377    channel: RouteChannel,
378    body: &[u8],
379) -> PushSendOutcome {
380    let Ok(route_channel) = u16::try_from(channel) else {
381        log::warn!("subc attach: invalid route channel {channel} for Push fan-out");
382        return PushSendOutcome::PermanentFailure;
383    };
384    let push_frame = match Frame::build_with_version(
385        PROTOCOL_VERSION,
386        FrameType::Push,
387        control_flags(),
388        route_channel,
389        0,
390        body.to_vec(),
391    ) {
392        Ok(frame) => frame,
393        Err(error) => {
394            log::warn!("subc attach: failed to build Push frame: {error}");
395            return PushSendOutcome::PermanentFailure;
396        }
397    };
398    match writer_tx.try_send(push_frame) {
399        Ok(()) => PushSendOutcome::Sent,
400        Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
401        Err(mpsc::error::TrySendError::Closed(_)) => {
402            log::warn!("subc attach: writer closed while sending Push frame");
403            PushSendOutcome::PermanentFailure
404        }
405    }
406}
407
408fn try_send_push_frame(
409    writer_tx: &mpsc::Sender<Frame>,
410    channel: RouteChannel,
411    frame: &PushFrame,
412) -> PushSendOutcome {
413    let body = match serde_json::to_vec(frame) {
414        Ok(body) => body,
415        Err(error) => {
416            log::warn!("subc attach: failed to serialize PushFrame: {error}");
417            return PushSendOutcome::PermanentFailure;
418        }
419    };
420    try_send_push_body(writer_tx, channel, &body)
421}
422
423fn bounded_push_back<T>(queue: &mut VecDeque<T>, item: T) {
424    if queue.len() >= PUSH_BUFFER_MAX_PER_KEY {
425        queue.pop_front();
426    }
427    queue.push_back(item);
428}
429
430fn buffer_push_frame(
431    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
432    key: ReplayKey,
433    frame: PushFrame,
434) {
435    bounded_push_back(push_buffer.entry(key).or_default(), frame);
436}
437
438fn buffer_retry_frame(
439    retry_buffer: &mut RetryBuffer,
440    channel: RouteChannel,
441    key: ReplayKey,
442    frame: PushFrame,
443) {
444    bounded_push_back(retry_buffer.entry(channel).or_default(), (key, frame));
445}
446
447fn migrate_retry_buffer_to_push_buffer(
448    retry_buffer: &mut RetryBuffer,
449    channel: RouteChannel,
450    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
451) -> usize {
452    let Some(frames) = retry_buffer.remove(&channel) else {
453        return 0;
454    };
455    let migrated = frames.len();
456    for (key, frame) in frames {
457        buffer_push_frame(push_buffer, key, frame);
458    }
459    migrated
460}
461
462fn replay_buffered_push_frames(
463    writer_tx: &mpsc::Sender<Frame>,
464    channel: RouteChannel,
465    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
466    key: &ReplayKey,
467) -> usize {
468    let mut sent = 0;
469    let remove_empty;
470
471    {
472        let Some(queue) = push_buffer.get_mut(key) else {
473            return 0;
474        };
475
476        while let Some(frame) = queue.pop_front() {
477            match try_send_push_frame(writer_tx, channel, &frame) {
478                PushSendOutcome::Sent => sent += 1,
479                PushSendOutcome::Backpressure => {
480                    queue.push_front(frame);
481                    break;
482                }
483                PushSendOutcome::PermanentFailure => {
484                    log::warn!(
485                        "subc attach: dropping buffered reliable Push for root {} harness {} session {} after permanent send failure",
486                        key.root.as_path().display(),
487                        key.harness,
488                        key.session
489                    );
490                }
491            }
492        }
493
494        remove_empty = queue.is_empty();
495    }
496
497    if remove_empty {
498        push_buffer.remove(key);
499    }
500
501    sent
502}
503
504fn drain_retry_buffer_for_channel(
505    writer_tx: &mpsc::Sender<Frame>,
506    channel: RouteChannel,
507    retry_buffer: &mut RetryBuffer,
508) -> usize {
509    let mut sent = 0;
510    let remove_empty;
511
512    {
513        let Some(queue) = retry_buffer.get_mut(&channel) else {
514            return 0;
515        };
516
517        while let Some((key, frame)) = queue.pop_front() {
518            match try_send_push_frame(writer_tx, channel, &frame) {
519                PushSendOutcome::Sent => sent += 1,
520                PushSendOutcome::Backpressure => {
521                    queue.push_front((key, frame));
522                    break;
523                }
524                PushSendOutcome::PermanentFailure => {
525                    log::warn!(
526                        "subc attach: dropping retry-buffered reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
527                        key.root.as_path().display(),
528                        key.harness,
529                        key.session
530                    );
531                }
532            }
533        }
534
535        remove_empty = queue.is_empty();
536    }
537
538    if remove_empty {
539        retry_buffer.remove(&channel);
540    }
541
542    sent
543}
544
545fn drain_retry_buffers_for_bound_routes(
546    writer_tx: &mpsc::Sender<Frame>,
547    routes: &HashMap<RouteChannel, RouteIdentity>,
548    retry_buffer: &mut RetryBuffer,
549) -> usize {
550    let channels: Vec<RouteChannel> = routes.keys().copied().collect();
551    channels
552        .into_iter()
553        .map(|channel| drain_retry_buffer_for_channel(writer_tx, channel, retry_buffer))
554        .sum()
555}
556
557fn matching_route_channels(
558    routes: &HashMap<RouteChannel, RouteIdentity>,
559    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
560    root: &ProjectRootId,
561    frame: &PushFrame,
562) -> Vec<RouteChannel> {
563    let Some(channels) = root_channels.get(root) else {
564        return Vec::new();
565    };
566
567    let session = frame_session(frame);
568    channels
569        .iter()
570        .copied()
571        .filter(|channel| match session {
572            Some(session) => routes
573                .get(channel)
574                .is_some_and(|identity| identity.session == session),
575            None => true,
576        })
577        .collect()
578}
579
580fn buffer_detached_reliable_push_frame(
581    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
582    session_identity: &HashMap<(ProjectRootId, String), String>,
583    root: &ProjectRootId,
584    frame: &PushFrame,
585) {
586    let Some(session) = frame_session(frame) else {
587        log::warn!(
588            "subc attach: dropping reliable project-scoped Push for root {} because no route is bound",
589            root.as_path().display()
590        );
591        return;
592    };
593
594    if let Some(key) = replay_key_for_session(session_identity, root, session) {
595        buffer_push_frame(push_buffer, key, frame.clone());
596    } else {
597        log::warn!(
598            "subc attach: dropping reliable Push for root {} session {} because no retained harness identity is known",
599            root.as_path().display(),
600            session
601        );
602    }
603}
604
605fn fan_out_lossy_push_frame(
606    writer_tx: &mpsc::Sender<Frame>,
607    routes: &HashMap<RouteChannel, RouteIdentity>,
608    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
609    root: &ProjectRootId,
610    frame: &PushFrame,
611) -> FanOutResult {
612    let matching_channels = matching_route_channels(routes, root_channels, root, frame);
613    let matched_channels = matching_channels.len();
614    if matched_channels == 0 {
615        return FanOutResult::default();
616    }
617
618    let body = match serde_json::to_vec(frame) {
619        Ok(body) => body,
620        Err(error) => {
621            log::warn!("subc attach: failed to serialize PushFrame for fan-out: {error}");
622            return FanOutResult {
623                matched_channels,
624                sent_frames: 0,
625            };
626        }
627    };
628
629    let sent_frames = matching_channels
630        .into_iter()
631        .filter(|&channel| {
632            matches!(
633                try_send_push_body(writer_tx, channel, &body),
634                PushSendOutcome::Sent
635            )
636        })
637        .count();
638
639    FanOutResult {
640        matched_channels,
641        sent_frames,
642    }
643}
644
645fn fan_out_reliable_push_frame(
646    writer_tx: &mpsc::Sender<Frame>,
647    routes: &HashMap<RouteChannel, RouteIdentity>,
648    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
649    session_identity: &HashMap<(ProjectRootId, String), String>,
650    retry_buffer: &mut RetryBuffer,
651    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
652    root: &ProjectRootId,
653    frame: &PushFrame,
654) -> FanOutResult {
655    let matching_channels = matching_route_channels(routes, root_channels, root, frame);
656    let matched_channels = matching_channels.len();
657    if matched_channels == 0 {
658        buffer_detached_reliable_push_frame(push_buffer, session_identity, root, frame);
659        return FanOutResult::default();
660    }
661
662    let mut sent_frames = 0;
663    for channel in matching_channels {
664        let Some(identity) = routes.get(&channel) else {
665            log::warn!(
666                "subc attach: dropping reliable Push for stale route channel {channel} with no route identity"
667            );
668            continue;
669        };
670        let key = ReplayKey::from_identity(identity);
671
672        if retry_buffer
673            .get(&channel)
674            .is_some_and(|queue| !queue.is_empty())
675        {
676            buffer_retry_frame(retry_buffer, channel, key, frame.clone());
677            continue;
678        }
679
680        match try_send_push_frame(writer_tx, channel, frame) {
681            PushSendOutcome::Sent => sent_frames += 1,
682            PushSendOutcome::Backpressure => {
683                buffer_retry_frame(retry_buffer, channel, key, frame.clone());
684            }
685            PushSendOutcome::PermanentFailure => {
686                log::warn!(
687                    "subc attach: dropping reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
688                    key.root.as_path().display(),
689                    key.harness,
690                    key.session
691                );
692            }
693        }
694    }
695
696    FanOutResult {
697        matched_channels,
698        sent_frames,
699    }
700}
701
702fn process_reliable_push_frame(
703    writer_tx: &mpsc::Sender<Frame>,
704    routes: &HashMap<RouteChannel, RouteIdentity>,
705    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
706    session_identity: &HashMap<(ProjectRootId, String), String>,
707    retry_buffer: &mut RetryBuffer,
708    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
709    completed_tasks: &mut CompletedTaskIds,
710    root: ProjectRootId,
711    frame: PushFrame,
712) {
713    if let Some(task_id) = completed_task_id(&frame) {
714        completed_tasks.remember(task_id);
715    }
716    let _ = fan_out_reliable_push_frame(
717        writer_tx,
718        routes,
719        root_channels,
720        session_identity,
721        retry_buffer,
722        push_buffer,
723        &root,
724        &frame,
725    );
726}
727
728fn process_lossy_push_frame(
729    writer_tx: &mpsc::Sender<Frame>,
730    routes: &HashMap<RouteChannel, RouteIdentity>,
731    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
732    completed_tasks: &CompletedTaskIds,
733    root: ProjectRootId,
734    frame: PushFrame,
735) {
736    if should_drop_lossy_push(completed_tasks, &frame) {
737        if let Some(task_id) = long_running_task_id(&frame) {
738            log::debug!(
739                "subc attach: dropping stale BashLongRunning Push for completed task {task_id}"
740            );
741        }
742        return;
743    }
744
745    let _ = fan_out_lossy_push_frame(writer_tx, routes, root_channels, &root, &frame);
746}
747
748/// Sync command dispatch, passed in from `main` (the binary owns the command
749/// table). Invoked only inside executor jobs in subc mode.
750pub type DispatchFn = fn(RawRequest, &AppContext) -> Response;
751
752/// Entry point for `aft --subc <connection-file>`. Synchronous on the outside;
753/// owns an isolated current-thread tokio runtime for the async transport.
754/// Returns `Err` (fail-loud) on any connect/auth/protocol failure — we never
755/// fall back to the standalone loop, to avoid split-brain index state.
756pub fn run_subc_mode(
757    connection_file_path: &Path,
758    ctx: Arc<AppContext>,
759    executor: Arc<Executor>,
760    dispatch: DispatchFn,
761    user_config_path: Option<PathBuf>,
762) -> Result<(), SubcError> {
763    // Production NEVER allows non-manifest tool names on route channels: AFT
764    // fails closed and does not trust subc to enforce the manifest. The
765    // test-only harness sets this through `run_subc_mode_for_test`.
766    run_subc_mode_inner(
767        connection_file_path,
768        ctx,
769        executor,
770        dispatch,
771        user_config_path,
772        false,
773    )
774}
775
776fn run_subc_mode_inner(
777    connection_file_path: &Path,
778    ctx: Arc<AppContext>,
779    executor: Arc<Executor>,
780    dispatch: DispatchFn,
781    user_config_path: Option<PathBuf>,
782    allow_native_passthrough: bool,
783) -> Result<(), SubcError> {
784    let runtime = tokio::runtime::Builder::new_current_thread()
785        .enable_all()
786        .build()
787        .map_err(SubcError::Runtime)?;
788
789    let executor_for_loop = Arc::clone(&executor);
790    let loop_result = runtime.block_on(async move {
791        let shared_app = ctx.app();
792        drop(ctx);
793        let stream = connect_and_authenticate(connection_file_path).await?;
794        log::info!(
795            "subc attach: authenticated to daemon via {}",
796            connection_file_path.display()
797        );
798        let (read_half, write_half) = tokio::io::split(stream);
799        run_module_loop(
800            read_half,
801            write_half,
802            shared_app,
803            executor_for_loop,
804            dispatch,
805            user_config_path,
806            allow_native_passthrough,
807        )
808        .await
809    });
810
811    for actor_ctx in executor.actor_contexts() {
812        actor_ctx.lsp().shutdown_all();
813        actor_ctx.bash_background().detach();
814    }
815
816    loop_result
817}
818
819/// Test-only entry that enables the non-manifest native-command passthrough on
820/// route channels. Integration tests drive synthetic native commands (`glob`,
821/// `callers`, `subc_test_echo_session`, …) through the executor to exercise
822/// mechanics; production callers use [`run_subc_mode`], which fails closed.
823#[doc(hidden)]
824pub fn run_subc_mode_for_test(
825    connection_file_path: &Path,
826    ctx: Arc<AppContext>,
827    executor: Arc<Executor>,
828    dispatch: DispatchFn,
829    user_config_path: Option<PathBuf>,
830) -> Result<(), SubcError> {
831    run_subc_mode_inner(
832        connection_file_path,
833        ctx,
834        executor,
835        dispatch,
836        user_config_path,
837        true,
838    )
839}
840
841/// Read the connection file → resolve the first endpoint → TCP connect → HMAC
842/// handshake. Mirrors the reference `fake-aft-stub::connect_to_subc`.
843async fn connect_and_authenticate(connection_file_path: &Path) -> Result<TcpStream, SubcError> {
844    let conn = connection_file::read(connection_file_path).map_err(|source| {
845        SubcError::ConnectionFile {
846            path: connection_file_path.to_path_buf(),
847            source,
848        }
849    })?;
850
851    let endpoint = conn
852        .endpoints
853        .first()
854        .ok_or_else(|| SubcError::NoEndpoint {
855            path: connection_file_path.to_path_buf(),
856        })?;
857    let endpoint_label = format!("{}:{}", endpoint.host, endpoint.port);
858    let ip = endpoint
859        .host
860        .parse::<IpAddr>()
861        .map_err(|_| SubcError::InvalidEndpoint {
862            path: connection_file_path.to_path_buf(),
863            endpoint: endpoint_label.clone(),
864        })?;
865    let addr = SocketAddr::new(ip, endpoint.port);
866
867    let mut stream = TcpStream::connect(addr)
868        .await
869        .map_err(|source| SubcError::Connect {
870            endpoint: endpoint_label.clone(),
871            source,
872        })?;
873
874    authenticate_client(&mut stream, &conn, AUTH_DEADLINE)
875        .await
876        .map_err(|source| SubcError::Auth {
877            endpoint: endpoint_label,
878            source,
879        })?;
880
881    Ok(stream)
882}
883
884/// ModuleHello → HelloAck → control/route loop. Runs until the daemon closes
885/// the connection (EOF), sends channel-0 Goodbye, or a fatal mutating executor
886/// response requests whole-connection teardown.
887async fn run_module_loop<R, W>(
888    mut read: R,
889    mut write: W,
890    shared_app: Arc<App>,
891    executor: Arc<Executor>,
892    dispatch: DispatchFn,
893    user_config_path: Option<PathBuf>,
894    allow_native_passthrough: bool,
895) -> Result<(), SubcError>
896where
897    R: AsyncRead + Unpin + Send + 'static,
898    W: AsyncWrite + Unpin + Send + 'static,
899{
900    // ModuleHello: register as a tool provider. control_ops:None = full baseline.
901    let hello = ModuleHelloBody {
902        manifest: build_manifest(),
903        protocol_ver: PROTOCOL_VERSION,
904        control_ops: None,
905    };
906    let hello_frame = Frame::build(
907        FrameType::Hello,
908        control_flags(),
909        0,
910        HELLO_CORR,
911        serde_json::to_vec(&hello).map_err(SubcError::Json)?,
912    )
913    .map_err(SubcError::FrameBuild)?;
914    write_frame(&mut write, &hello_frame)
915        .await
916        .map_err(SubcError::FrameIo)?;
917
918    // Expect HelloAck (registered) or a channel-0 Error (manifest/version reject).
919    match read_frame(&mut read).await.map_err(SubcError::FrameIo)? {
920        None => return Err(SubcError::ClosedBeforeHelloAck),
921        Some(frame) => match frame.header.ty {
922            FrameType::HelloAck => {
923                log::info!("subc attach: registered (HelloAck received)");
924            }
925            FrameType::Error => {
926                let body = serde_json::from_slice::<ErrorBody>(&frame.body).ok();
927                return Err(SubcError::HelloRejected { body });
928            }
929            other => return Err(SubcError::UnexpectedFrame { ty: other }),
930        },
931    }
932
933    let (writer_tx, writer_rx) = mpsc::channel::<Frame>(256);
934    let writer_task = spawn_writer_task(write, writer_rx);
935    // `read_frame` is NOT cancellation-safe, so it must never sit directly inside
936    // the `select!` below: a drain-interval tick (or shutdown) firing while a
937    // frame is mid-transit would drop the partially-consumed bytes and desync the
938    // stream (the next read would parse a body byte as a frame header). A
939    // dedicated reader task owns the socket, reads whole frames sequentially, and
940    // forwards them over a channel; the loop selects on the cancel-safe `recv()`.
941    let (reader_tx, mut reader_rx) = mpsc::channel::<Result<Frame, SubcError>>(256);
942    let reader_task = spawn_reader_task(read, reader_tx);
943    let shutdown = Arc::new(Notify::new());
944    let mut drain_interval = tokio::time::interval(Duration::from_millis(250));
945    let (maintenance_tx, mut maintenance_rx) = mpsc::channel::<(ProjectRootId, Response)>(256);
946    let (control_completion_tx, mut control_completion_rx) =
947        mpsc::channel::<RouteBindCompletion>(256);
948    let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1024);
949    let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
950    let push_senders = PushSenders {
951        lossy_tx,
952        reliable_tx,
953    };
954    let mut routes: HashMap<RouteChannel, RouteIdentity> = HashMap::new();
955    let mut root_channels: HashMap<ProjectRootId, HashSet<RouteChannel>> = HashMap::new();
956    let mut session_identity: HashMap<(ProjectRootId, String), String> = HashMap::new();
957    let mut push_buffer: HashMap<ReplayKey, VecDeque<PushFrame>> = HashMap::new();
958    let mut retry_buffer: RetryBuffer = HashMap::new();
959    let mut completed_tasks = CompletedTaskIds::default();
960    let mut live_roots: HashMap<ProjectRootId, RootMeta> = HashMap::new();
961    let mut pending_binds: HashMap<RouteChannel, PendingBind> = HashMap::new();
962
963    let loop_result: Result<(), SubcError> = loop {
964        tokio::select! {
965            _ = shutdown.notified() => {
966                log::warn!("subc attach: fatal executor response requested teardown");
967                break Ok(());
968            }
969            maybe_frame = reader_rx.recv() => {
970                let frame = match maybe_frame {
971                    None => {
972                        log::info!("subc attach: daemon closed connection");
973                        break Ok(());
974                    }
975                    Some(Err(error)) => break Err(error),
976                    Some(Ok(frame)) => frame,
977                };
978
979                match frame.header.ty {
980                    FrameType::Ping if frame.header.channel == 0 => {
981                        let pong = match Frame::build_with_version(
982                            frame.header.ver,
983                            FrameType::Pong,
984                            frame.header.flags,
985                            0,
986                            frame.header.corr,
987                            Vec::new(),
988                        ) {
989                            Ok(pong) => pong,
990                            Err(error) => break Err(SubcError::FrameBuild(error)),
991                        };
992                        if let Err(error) = send_frame(&writer_tx, pong).await {
993                            break Err(error);
994                        }
995                    }
996                    FrameType::Goodbye if frame.header.channel == 0 => {
997                        log::info!("subc attach: received channel-0 Goodbye");
998                        break Ok(());
999                    }
1000                    FrameType::Goodbye => {
1001                        let channel = route_key(frame.header.channel);
1002                        if let Some(pending) = pending_binds.get_mut(&channel) {
1003                            pending.cancelled = true;
1004                            log::debug!(
1005                                "subc attach: cancelled pending RouteBind for route {} on Goodbye",
1006                                frame.header.channel
1007                            );
1008                        }
1009                        let migrated = migrate_retry_buffer_to_push_buffer(
1010                            &mut retry_buffer,
1011                            channel,
1012                            &mut push_buffer,
1013                        );
1014                        if let Some(identity) = remove_route_channel(&mut routes, &mut root_channels, channel) {
1015                            if migrated > 0 {
1016                                log::debug!(
1017                                    "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from route {} into detach replay",
1018                                    frame.header.channel
1019                                );
1020                            }
1021                            if let Some(meta) = live_roots.get_mut(&identity.root) {
1022                                let idle_for = meta.last_touched.elapsed();
1023                                meta.touch();
1024                                log::debug!(
1025                                    "subc attach: route {} torn down for root {} harness {} session {} (last touched {:?} ago)",
1026                                    frame.header.channel,
1027                                    identity.root.as_path().display(),
1028                                    identity.harness,
1029                                    identity.session,
1030                                    idle_for
1031                                );
1032                            } else {
1033                                log::debug!(
1034                                    "subc attach: route {} torn down for root {} harness {} session {}",
1035                                    frame.header.channel,
1036                                    identity.root.as_path().display(),
1037                                    identity.harness,
1038                                    identity.session
1039                                );
1040                            }
1041                        } else {
1042                            if migrated > 0 {
1043                                log::debug!(
1044                                    "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from unbound route {} into detach replay",
1045                                    frame.header.channel
1046                                );
1047                            }
1048                            log::debug!("subc attach: unbound route {} torn down", frame.header.channel);
1049                        }
1050                    }
1051                    FrameType::Request if frame.header.channel == 0 => {
1052                        if let Err(error) = handle_control_request(
1053                            &writer_tx,
1054                            &frame,
1055                            &shared_app,
1056                            &executor,
1057                            &mut live_roots,
1058                            &mut pending_binds,
1059                            &control_completion_tx,
1060                            &push_senders,
1061                            dispatch,
1062                            user_config_path.as_deref(),
1063                        )
1064                        .await
1065                        {
1066                            break Err(error);
1067                        }
1068                    }
1069                    FrameType::Request => {
1070                        if let Err(error) = handle_tool_call(
1071                            &writer_tx,
1072                            &frame,
1073                            &routes,
1074                            &pending_binds,
1075                            &mut live_roots,
1076                            &executor,
1077                            &shutdown,
1078                            dispatch,
1079                            allow_native_passthrough,
1080                        )
1081                        .await
1082                        {
1083                            break Err(error);
1084                        }
1085                    }
1086                    // Cancel/Push/etc. are not yet handled: in-flight tool-call
1087                    // cancellation is not implemented, so these frame types are
1088                    // ignored rather than acted on.
1089                    _ => {}
1090                }
1091            }
1092            Some((root_id, frame)) = reliable_rx.recv() => {
1093                // Drain reliable frames in FIFO order. They are intentionally not
1094                // coalesced: completion, pattern-match, and warning frames are
1095                // must-deliver events.
1096                let mut batch = vec![(root_id, frame)];
1097                while let Ok(item) = reliable_rx.try_recv() {
1098                    batch.push(item);
1099                }
1100
1101                for (root, frame) in batch {
1102                    process_reliable_push_frame(
1103                        &writer_tx,
1104                        &routes,
1105                        &root_channels,
1106                        &session_identity,
1107                        &mut retry_buffer,
1108                        &mut push_buffer,
1109                        &mut completed_tasks,
1110                        root,
1111                        frame,
1112                    );
1113                }
1114            }
1115            Some((root_id, frame)) = lossy_rx.recv() => {
1116                // If both lanes are ready, process any already-queued reliable
1117                // completions first so a following stale BashLongRunning frame can
1118                // be suppressed even if select! happened to wake on the lossy lane.
1119                while let Ok((reliable_root, reliable_frame)) = reliable_rx.try_recv() {
1120                    process_reliable_push_frame(
1121                        &writer_tx,
1122                        &routes,
1123                        &root_channels,
1124                        &session_identity,
1125                        &mut retry_buffer,
1126                        &mut push_buffer,
1127                        &mut completed_tasks,
1128                        reliable_root,
1129                        reliable_frame,
1130                    );
1131                }
1132
1133                // Drain the currently queued burst in one loop turn so lossy
1134                // status/progress classes coalesce before reaching subc's shared
1135                // egress queue.
1136                let mut batch = vec![(root_id, frame)];
1137                while let Ok(item) = lossy_rx.try_recv() {
1138                    batch.push(item);
1139                }
1140
1141                for (root, frame) in coalesce_push_batch(batch) {
1142                    process_lossy_push_frame(
1143                        &writer_tx,
1144                        &routes,
1145                        &root_channels,
1146                        &completed_tasks,
1147                        root,
1148                        frame,
1149                    );
1150                }
1151            }
1152            Some(completion) = control_completion_rx.recv() => {
1153                if let Err(error) = handle_route_bind_completion(
1154                    &writer_tx,
1155                    completion,
1156                    &mut routes,
1157                    &mut root_channels,
1158                    &mut session_identity,
1159                    &mut push_buffer,
1160                    &mut live_roots,
1161                    &mut pending_binds,
1162                    &executor,
1163                    &shutdown,
1164                )
1165                .await
1166                {
1167                    break Err(error);
1168                }
1169            }
1170            Some((root_id, response)) = maintenance_rx.recv() => {
1171                if let Some(meta) = live_roots.get_mut(&root_id) {
1172                    meta.maintenance_pending = false;
1173                }
1174                if response_is_internal_error(&response) {
1175                    signal_fatal_teardown(&writer_tx, None, PROTOCOL_VERSION, 0, &shutdown).await;
1176                }
1177            }
1178            _ = drain_interval.tick() => {
1179                let retried = drain_retry_buffers_for_bound_routes(
1180                    &writer_tx,
1181                    &routes,
1182                    &mut retry_buffer,
1183                );
1184                if retried > 0 {
1185                    log::debug!(
1186                        "subc attach: retried {retried} reliable Push frame(s) after writer backpressure"
1187                    );
1188                }
1189
1190                let due_roots: Vec<ProjectRootId> = live_roots
1191                    .iter_mut()
1192                    .filter_map(|(root_id, meta)| {
1193                        if meta.maintenance_pending {
1194                            None
1195                        } else {
1196                            meta.maintenance_pending = true;
1197                            Some(root_id.clone())
1198                        }
1199                    })
1200                    .collect();
1201                for root_id in due_roots {
1202                    submit_maintenance_drain(&executor, root_id, &maintenance_tx);
1203                }
1204            }
1205        }
1206    };
1207
1208    // The reader task may be parked on `read_frame`; abort it (we are done with
1209    // the connection) and flush the writer.
1210    reader_task.abort();
1211    drop(writer_tx);
1212    let writer_result = finish_writer_task(writer_task).await;
1213    loop_result.and(writer_result)
1214}
1215
1216fn spawn_writer_task<W>(
1217    mut write: W,
1218    mut rx: mpsc::Receiver<Frame>,
1219) -> JoinHandle<Result<(), subc_transport::FrameIoError>>
1220where
1221    W: AsyncWrite + Unpin + Send + 'static,
1222{
1223    tokio::spawn(async move {
1224        while let Some(frame) = rx.recv().await {
1225            write_frame(&mut write, &frame).await?;
1226        }
1227        Ok(())
1228    })
1229}
1230
1231/// Owns the read half and reads whole frames sequentially. `read_frame` is not
1232/// cancellation-safe, so it must run here — never inside the main loop's
1233/// `select!` — to keep the inbound stream framed. Each frame (or the terminal
1234/// error / EOF) is forwarded over `tx`; the loop consumes them via cancel-safe
1235/// `recv()`. Exits on EOF (Ok(None)), a read error, or when `tx` is dropped
1236/// (the loop ended and aborted us).
1237fn spawn_reader_task<R>(mut read: R, tx: mpsc::Sender<Result<Frame, SubcError>>) -> JoinHandle<()>
1238where
1239    R: AsyncRead + Unpin + Send + 'static,
1240{
1241    tokio::spawn(async move {
1242        loop {
1243            match read_frame(&mut read).await {
1244                Ok(Some(frame)) => {
1245                    if tx.send(Ok(frame)).await.is_err() {
1246                        return;
1247                    }
1248                }
1249                Ok(None) => {
1250                    // EOF: let the loop observe channel close as "daemon closed".
1251                    return;
1252                }
1253                Err(error) => {
1254                    let _ = tx.send(Err(SubcError::FrameIo(error))).await;
1255                    return;
1256                }
1257            }
1258        }
1259    })
1260}
1261
1262async fn finish_writer_task(
1263    mut writer_task: JoinHandle<Result<(), subc_transport::FrameIoError>>,
1264) -> Result<(), SubcError> {
1265    match tokio::time::timeout(Duration::from_millis(100), &mut writer_task).await {
1266        Ok(Ok(Ok(()))) => Ok(()),
1267        Ok(Ok(Err(error))) => Err(SubcError::FrameIo(error)),
1268        Ok(Err(error)) => Err(SubcError::WriterJoin(error)),
1269        Err(_) => {
1270            writer_task.abort();
1271            Ok(())
1272        }
1273    }
1274}
1275
1276async fn send_frame(tx: &mpsc::Sender<Frame>, frame: Frame) -> Result<(), SubcError> {
1277    match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(frame)).await {
1278        Ok(Ok(())) => Ok(()),
1279        Ok(Err(_)) => Err(SubcError::WriterClosed),
1280        Err(_) => Err(SubcError::WriterBackpressureTimeout),
1281    }
1282}
1283
1284fn rollback_pending_bind_actor(
1285    executor: &Arc<Executor>,
1286    live_roots: &HashMap<ProjectRootId, RootMeta>,
1287    root_id: &ProjectRootId,
1288    inserted_new_actor: bool,
1289) {
1290    if inserted_new_actor && !live_roots.contains_key(root_id) {
1291        executor.remove_actor(root_id);
1292    }
1293}
1294
1295async fn handle_route_bind_completion(
1296    tx: &mpsc::Sender<Frame>,
1297    completion: RouteBindCompletion,
1298    routes: &mut HashMap<RouteChannel, RouteIdentity>,
1299    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
1300    session_identity: &mut HashMap<(ProjectRootId, String), String>,
1301    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1302    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1303    pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1304    executor: &Arc<Executor>,
1305    shutdown: &Arc<Notify>,
1306) -> Result<(), SubcError> {
1307    let route_id = route_key(completion.route_channel);
1308    let Some(pending) = pending_binds.remove(&route_id) else {
1309        log::warn!(
1310            "subc attach: dropping RouteBind completion for non-pending route {}",
1311            completion.route_channel
1312        );
1313        rollback_pending_bind_actor(
1314            executor,
1315            live_roots,
1316            &completion.bind_root_id,
1317            completion.inserted_new_actor,
1318        );
1319        return Ok(());
1320    };
1321
1322    if pending.bind_root_id != completion.bind_root_id {
1323        log::warn!(
1324            "subc attach: pending RouteBind root mismatch for route {} (pending {} completion {})",
1325            completion.route_channel,
1326            pending.bind_root_id.as_path().display(),
1327            completion.bind_root_id.as_path().display()
1328        );
1329    }
1330
1331    let inserted_new_actor = pending.inserted_new_actor || completion.inserted_new_actor;
1332    if pending.cancelled {
1333        rollback_pending_bind_actor(
1334            executor,
1335            live_roots,
1336            &completion.bind_root_id,
1337            inserted_new_actor,
1338        );
1339        log::debug!(
1340            "subc attach: discarded completed RouteBind for cancelled route {} root {}",
1341            completion.route_channel,
1342            completion.bind_root_id.as_path().display()
1343        );
1344        return Ok(());
1345    }
1346
1347    let failure = if !completion.configure_response.success {
1348        Some((
1349            &completion.configure_response,
1350            "configure failed during route bind",
1351        ))
1352    } else if let Some(drain_response) = completion.drain_response.as_ref() {
1353        if drain_response.success {
1354            None
1355        } else {
1356            Some((
1357                drain_response,
1358                "build-completion drain failed during route bind",
1359            ))
1360        }
1361    } else {
1362        None
1363    };
1364
1365    if let Some((response, fallback)) = failure {
1366        rollback_pending_bind_actor(
1367            executor,
1368            live_roots,
1369            &completion.bind_root_id,
1370            inserted_new_actor,
1371        );
1372        let message = response_message(response, fallback);
1373        let fatal = response_is_internal_error(response);
1374        send_route_bind_error_parts(
1375            tx,
1376            completion.ver,
1377            completion.corr,
1378            completion.flags,
1379            "config_divergence",
1380            &message,
1381        )
1382        .await?;
1383        if fatal {
1384            signal_fatal_teardown(
1385                tx,
1386                Some(completion.route_channel),
1387                completion.ver,
1388                completion.corr,
1389                shutdown,
1390            )
1391            .await;
1392        }
1393        return Ok(());
1394    }
1395
1396    remember_session_identity(session_identity, &completion.identity);
1397    let replay_key = ReplayKey::from_identity(&completion.identity);
1398    insert_route_channel(routes, root_channels, route_id, completion.identity);
1399    live_roots
1400        .entry(completion.bind_root_id.clone())
1401        .and_modify(|meta| {
1402            meta.touch();
1403            meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1404        })
1405        .or_insert_with(|| RootMeta::new(Instant::now()));
1406    if let Some(meta) = live_roots.get_mut(&completion.bind_root_id) {
1407        meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1408    }
1409
1410    let ack =
1411        serde_json::to_vec(&ModuleControlResponse::RouteBindAck {}).map_err(SubcError::Json)?;
1412    let response = Frame::build_with_version(
1413        completion.ver,
1414        FrameType::Response,
1415        control_flags(),
1416        0,
1417        completion.corr,
1418        ack,
1419    )
1420    .map_err(SubcError::FrameBuild)?;
1421    send_frame(tx, response).await?;
1422    let replayed = replay_buffered_push_frames(tx, route_id, push_buffer, &replay_key);
1423    if replayed > 0 {
1424        log::debug!(
1425            "subc attach: replayed {} buffered Push frame(s) to route {} root {} harness {} session {}",
1426            replayed,
1427            completion.route_channel,
1428            replay_key.root.as_path().display(),
1429            replay_key.harness,
1430            replay_key.session
1431        );
1432    }
1433    log::info!(
1434        "subc attach: route {} bound to root {}",
1435        completion.route_channel,
1436        completion.bind_root_id.as_path().display()
1437    );
1438    Ok(())
1439}
1440
1441/// channel-0 control request — currently only RouteBind. Reconciles the route's
1442/// RootConfig through the executor's Mutating lane and resolves completion on a
1443/// loop-owned control-completion channel so slow configure jobs do not block the
1444/// transport loop.
1445async fn handle_control_request(
1446    tx: &mpsc::Sender<Frame>,
1447    frame: &Frame,
1448    shared_app: &Arc<App>,
1449    executor: &Arc<Executor>,
1450    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1451    pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1452    control_completion_tx: &mpsc::Sender<RouteBindCompletion>,
1453    push_senders: &PushSenders,
1454    dispatch: DispatchFn,
1455    user_config_path: Option<&Path>,
1456) -> Result<(), SubcError> {
1457    let request =
1458        serde_json::from_slice::<ModuleControlRequest>(&frame.body).map_err(SubcError::Json)?;
1459    match request {
1460        ModuleControlRequest::RouteBind {
1461            route_channel,
1462            target: _,
1463            identity,
1464            // Any wire-relayed `config` field is ignored via `..`: AFT reads config
1465            // from CortexKit files, never the wire. `..` keeps this tolerant whether
1466            // the protocol version still carries the field or has dropped it, so a
1467            // protocol field-removal cannot break this destructure either way.
1468            ..
1469        } => {
1470            let route_id = route_key(route_channel);
1471            if pending_binds.contains_key(&route_id) {
1472                return send_route_bind_error(
1473                    tx,
1474                    frame,
1475                    "config_divergence",
1476                    "route bind is already pending for channel",
1477                )
1478                .await;
1479            }
1480
1481            let bind_root_id = match ProjectRootId::from_path(&identity.project_root) {
1482                Ok(root_id) => root_id,
1483                Err(error) => {
1484                    return send_route_bind_error(
1485                        tx,
1486                        frame,
1487                        "config_divergence",
1488                        &format!("invalid route project root: {error}"),
1489                    )
1490                    .await;
1491                }
1492            };
1493
1494            // Reconcile RootConfig: build a configure request from the bind
1495            // identity + forwarded config tiers and run it through the executor.
1496            let request_id = format!("subc-bind-{route_channel}");
1497            let bind_project_root = identity.project_root.clone();
1498            let bind_harness = identity.harness.clone();
1499            let bind_session = identity.session.clone();
1500
1501            // Config is single-per-project, read by AFT directly from the
1502            // CortexKit config files (user: ~/.config/cortexkit/aft.jsonc,
1503            // project: <root>/.cortexkit/aft.jsonc). Wire-relayed config tiers are
1504            // IGNORED entirely: a front (runner or mcp:*) cannot push config over
1505            // the wire. This is what makes config harness-INDEPENDENT — every
1506            // harness binding a project gets the identical on-disk config, so two
1507            // trust domains sharing the per-root actor can never diverge or
1508            // inherit each other's capabilities (the cross-bind escalation class).
1509            // Wire-relayed config tiers (if the protocol still carries them) are
1510            // ignored entirely; the per-tier trust boundary (user trusted, project
1511            // privileged-dropped) is applied to the FILE tiers in handle_configure.
1512            let local_tiers = crate::subc_config::read_local_cortexkit_config_tiers(
1513                user_config_path,
1514                Path::new(&bind_project_root),
1515            );
1516            let config_tiers: Vec<Value> = local_tiers
1517                .iter()
1518                .map(|t| json!({ "tier": t.tier, "source": t.source, "doc": t.doc }))
1519                .collect();
1520            let diagnostics_on_edit = diagnostics_on_edit_from_tiers(&local_tiers);
1521            let configure_json = json!({
1522                "id": request_id,
1523                "command": "configure",
1524                "project_root": bind_project_root,
1525                "harness": bind_harness,
1526                "session_id": bind_session.clone(),
1527                "config": config_tiers,
1528            });
1529            let configure_req = match serde_json::from_value::<RawRequest>(configure_json) {
1530                Ok(req) => req,
1531                Err(error) => {
1532                    return send_route_bind_error(
1533                        tx,
1534                        frame,
1535                        "config_divergence",
1536                        &format!("failed to build configure request: {error}"),
1537                    )
1538                    .await;
1539                }
1540            };
1541
1542            let route_identity = RouteIdentity {
1543                root: bind_root_id.clone(),
1544                project_root: PathBuf::from(&bind_project_root),
1545                harness: bind_harness.clone(),
1546                session: bind_session.clone(),
1547            };
1548            let configure_session = route_identity.session.clone();
1549            let root_was_live = live_roots.contains_key(&bind_root_id);
1550            let inserted_new_actor = if root_was_live {
1551                log::debug!(
1552                    "subc attach: reusing actor for route {} root {}",
1553                    route_channel,
1554                    bind_root_id.as_path().display()
1555                );
1556                false
1557            } else {
1558                let actor_ctx = Arc::new(AppContext::from_app(
1559                    Arc::clone(shared_app),
1560                    Config::default(),
1561                ));
1562                install_bash_compressor(&actor_ctx);
1563                actor_ctx.set_progress_sender(Some(progress_sender_for_root(
1564                    push_senders.clone(),
1565                    bind_root_id.clone(),
1566                )));
1567                let inserted =
1568                    executor.register_actor(bind_root_id.clone(), Arc::clone(&actor_ctx));
1569                drop(actor_ctx);
1570                // Do not insert into live_roots until configure succeeds: live_roots
1571                // drives maintenance, and a half-configured new actor must not be
1572                // maintenance-eligible before its route/session identity exists.
1573                log::debug!(
1574                    "subc attach: registered actor for route {} root {}",
1575                    route_channel,
1576                    bind_root_id.as_path().display()
1577                );
1578                inserted
1579            };
1580
1581            pending_binds.insert(
1582                route_id,
1583                PendingBind {
1584                    bind_root_id: bind_root_id.clone(),
1585                    inserted_new_actor,
1586                    cancelled: false,
1587                },
1588            );
1589
1590            let configure_request_id = configure_req.id.clone();
1591            let configure_rx = executor.submit_async(
1592                bind_root_id.clone(),
1593                Lane::Mutating,
1594                configure_request_id.clone(),
1595                Box::new(move |ctx| {
1596                    log_ctx::with_session(Some(configure_session.clone()), || {
1597                        dispatch(configure_req, ctx)
1598                    })
1599                }),
1600            );
1601
1602            let completion_tx = control_completion_tx.clone();
1603            let completion_executor = Arc::clone(executor);
1604            let completion_identity = route_identity;
1605            let completion_root = bind_root_id.clone();
1606            let completion_route_channel = route_channel;
1607            let completion_ver = frame.header.ver;
1608            let completion_corr = frame.header.corr;
1609            let completion_flags = frame.header.flags;
1610            tokio::spawn(async move {
1611                let configure_response =
1612                    await_executor_response(configure_rx, configure_request_id.clone()).await;
1613                let drain_response = if configure_response.success && !root_was_live {
1614                    let drain_request_id = format!("subc-bind-drain-{completion_route_channel}");
1615                    let drain_response_id = drain_request_id.clone();
1616                    let drain_rx = completion_executor.submit_async(
1617                        completion_root.clone(),
1618                        Lane::Mutating,
1619                        drain_request_id.clone(),
1620                        Box::new(move |ctx| {
1621                            runtime_drain::drain_build_completions(ctx);
1622                            Response::success(drain_response_id, json!({ "drained": true }))
1623                        }),
1624                    );
1625                    Some(await_executor_response(drain_rx, drain_request_id).await)
1626                } else {
1627                    None
1628                };
1629
1630                let completion = RouteBindCompletion {
1631                    route_channel: completion_route_channel,
1632                    identity: completion_identity,
1633                    bind_root_id: completion_root,
1634                    inserted_new_actor,
1635                    configure_response,
1636                    drain_response,
1637                    diagnostics_on_edit,
1638                    ver: completion_ver,
1639                    corr: completion_corr,
1640                    flags: completion_flags,
1641                };
1642                if completion_tx.send(completion).await.is_err() {
1643                    log::debug!(
1644                        "subc attach: dropped RouteBind completion for route {} after loop exit",
1645                        completion_route_channel
1646                    );
1647                }
1648            });
1649
1650            Ok(())
1651        }
1652    }
1653}
1654
1655fn install_bash_compressor(ctx: &AppContext) {
1656    // Mirrors main.rs per-actor compressor installation for subc-created actors.
1657    let filter_registry_handle = ctx.shared_filter_registry();
1658    let compress_flag = ctx.bash_compress_flag();
1659    ctx.bash_background().set_compressor_with_exit_code(
1660        move |command: &str, output: String, exit_code: Option<i32>| {
1661            if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
1662                return crate::compress::CompressionResult::new(output);
1663            }
1664            let registry_guard = match filter_registry_handle.read() {
1665                Ok(g) => g,
1666                Err(poisoned) => poisoned.into_inner(),
1667            };
1668            crate::compress::compress_with_registry_exit_code(
1669                command,
1670                &output,
1671                exit_code,
1672                &registry_guard,
1673            )
1674        },
1675    );
1676}
1677
1678fn diagnostics_on_edit_from_tiers(tiers: &[ConfigTier]) -> bool {
1679    let mut diagnostics_on_edit = false;
1680    for tier in tiers {
1681        if let Some(value) = diagnostics_on_edit_from_doc(&tier.doc) {
1682            diagnostics_on_edit = value;
1683        }
1684    }
1685    diagnostics_on_edit
1686}
1687
1688fn diagnostics_on_edit_from_doc(doc: &str) -> Option<bool> {
1689    let stripped = strip_jsonc_for_subc(doc);
1690    let value = serde_json::from_str::<Value>(&stripped).ok()?;
1691    value
1692        .get("lsp")
1693        .and_then(Value::as_object)?
1694        .get("diagnostics_on_edit")
1695        .and_then(Value::as_bool)
1696}
1697
1698fn strip_jsonc_for_subc(source: &str) -> String {
1699    strip_trailing_commas_for_subc(&strip_jsonc_comments_for_subc(source))
1700}
1701
1702fn strip_jsonc_comments_for_subc(source: &str) -> String {
1703    let mut output = String::with_capacity(source.len());
1704    let mut chars = source.chars().peekable();
1705    let mut in_string = false;
1706    let mut escaped = false;
1707
1708    while let Some(ch) = chars.next() {
1709        if in_string {
1710            output.push(ch);
1711            if escaped {
1712                escaped = false;
1713            } else if ch == '\\' {
1714                escaped = true;
1715            } else if ch == '"' {
1716                in_string = false;
1717            }
1718            continue;
1719        }
1720
1721        if ch == '"' {
1722            in_string = true;
1723            output.push(ch);
1724            continue;
1725        }
1726
1727        if ch == '/' {
1728            match chars.peek().copied() {
1729                Some('/') => {
1730                    chars.next();
1731                    for next in chars.by_ref() {
1732                        if next == '\n' {
1733                            output.push('\n');
1734                            break;
1735                        }
1736                    }
1737                }
1738                Some('*') => {
1739                    chars.next();
1740                    let mut previous = '\0';
1741                    for next in chars.by_ref() {
1742                        if next == '\n' {
1743                            output.push('\n');
1744                        }
1745                        if previous == '*' && next == '/' {
1746                            break;
1747                        }
1748                        previous = next;
1749                    }
1750                }
1751                _ => output.push(ch),
1752            }
1753            continue;
1754        }
1755
1756        output.push(ch);
1757    }
1758
1759    output
1760}
1761
1762fn strip_trailing_commas_for_subc(source: &str) -> String {
1763    let chars = source.chars().collect::<Vec<_>>();
1764    let mut output = String::with_capacity(source.len());
1765    let mut index = 0usize;
1766    let mut in_string = false;
1767    let mut escaped = false;
1768
1769    while index < chars.len() {
1770        let ch = chars[index];
1771        if in_string {
1772            output.push(ch);
1773            if escaped {
1774                escaped = false;
1775            } else if ch == '\\' {
1776                escaped = true;
1777            } else if ch == '"' {
1778                in_string = false;
1779            }
1780            index += 1;
1781            continue;
1782        }
1783
1784        if ch == '"' {
1785            in_string = true;
1786            output.push(ch);
1787            index += 1;
1788            continue;
1789        }
1790
1791        if ch == ',' {
1792            let mut next = index + 1;
1793            while next < chars.len() && chars[next].is_whitespace() {
1794                next += 1;
1795            }
1796            if next < chars.len() && matches!(chars[next], '}' | ']') {
1797                index += 1;
1798                continue;
1799            }
1800        }
1801
1802        output.push(ch);
1803        index += 1;
1804    }
1805
1806    output
1807}
1808
1809async fn send_route_bind_error(
1810    tx: &mpsc::Sender<Frame>,
1811    frame: &Frame,
1812    code: &str,
1813    message: &str,
1814) -> Result<(), SubcError> {
1815    send_route_bind_error_parts(
1816        tx,
1817        frame.header.ver,
1818        frame.header.corr,
1819        frame.header.flags,
1820        code,
1821        message,
1822    )
1823    .await
1824}
1825
1826async fn send_route_bind_error_parts(
1827    tx: &mpsc::Sender<Frame>,
1828    ver: u8,
1829    corr: u64,
1830    flags: Flags,
1831    code: &str,
1832    message: &str,
1833) -> Result<(), SubcError> {
1834    let response = build_error_frame(ver, 0, corr, flags, code, message)?;
1835    send_frame(tx, response).await?;
1836    log::warn!("subc attach: route bind rejected ({code}): {message}");
1837    Ok(())
1838}
1839
1840/// Route-channel tool call: `{name, arguments}` → executor lane → dispatch to
1841/// the sync command core → wrap the structured Response in a CallToolResult
1842/// `{content, isError}`. v1 mapping: the whole `{success, ...}` Response
1843/// serialized into ONE text block; `isError` carries `success == false`.
1844async fn handle_tool_call(
1845    tx: &mpsc::Sender<Frame>,
1846    frame: &Frame,
1847    routes: &HashMap<RouteChannel, RouteIdentity>,
1848    pending_binds: &HashMap<RouteChannel, PendingBind>,
1849    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1850    executor: &Arc<Executor>,
1851    shutdown: &Arc<Notify>,
1852    dispatch: DispatchFn,
1853    allow_native_passthrough: bool,
1854) -> Result<(), SubcError> {
1855    let route_id = route_key(frame.header.channel);
1856    if pending_binds.contains_key(&route_id) {
1857        let error = build_error_frame(
1858            frame.header.ver,
1859            frame.header.channel,
1860            frame.header.corr,
1861            frame.header.flags,
1862            "route_not_bound",
1863            "route is not bound before tool call",
1864        )?;
1865        return send_frame(tx, error).await;
1866    }
1867
1868    let Some(identity) = routes.get(&route_id).cloned() else {
1869        let error = build_error_frame(
1870            frame.header.ver,
1871            frame.header.channel,
1872            frame.header.corr,
1873            frame.header.flags,
1874            "route_not_bound",
1875            "route is not bound before tool call",
1876        )?;
1877        return send_frame(tx, error).await;
1878    };
1879    if let Some(meta) = live_roots.get_mut(&identity.root) {
1880        meta.touch();
1881    }
1882
1883    let call = serde_json::from_slice::<ToolCallRequest>(&frame.body).map_err(SubcError::Json)?;
1884    let bare_name = call.name.clone();
1885    let format_context = crate::subc_format::FormatContext::from_tool_call(
1886        &bare_name,
1887        &call.arguments,
1888        identity.project_root.as_path(),
1889    );
1890
1891    let request_id = format!("subc-{}-{}", frame.header.channel, frame.header.corr);
1892    let project_root = identity.project_root.as_path();
1893    let diagnostics_on_edit = live_roots
1894        .get(&identity.root)
1895        .map(|meta| meta.diagnostics_on_edit)
1896        .unwrap_or(false);
1897    let translate_context = crate::subc_translate::TranslateContext {
1898        diagnostics_on_edit,
1899    };
1900    let (command, translated_args) = match crate::subc_translate::subc_translate_with_context(
1901        &call.name,
1902        &call.arguments,
1903        project_root,
1904        translate_context,
1905    ) {
1906        Ok(t) => (t.command, t.args),
1907        // A core agent tool that fails translation is a real client error
1908        // (e.g. `edit` with no resolvable mode) — surface it, never guess.
1909        Err(err) if is_subc_agent_core_tool(&call.name) => {
1910            let response = Response::error(request_id.clone(), err.code, err.message);
1911            let response_frame = build_tool_response_frame(
1912                frame.header.ver,
1913                frame.header.channel,
1914                frame.header.corr,
1915                frame.header.flags,
1916                &bare_name,
1917                &format_context,
1918                &response,
1919            )?;
1920            return send_frame(tx, response_frame).await;
1921        }
1922        // A non-core name is NOT in the tool manifest. AFT fails closed and
1923        // does not trust subc to enforce the manifest: rejecting here is the
1924        // defense-in-depth backstop that prevents a forwarded native command
1925        // (e.g. `configure`, which would reach handle_configure and bypass
1926        // the RouteBind config-trust cap) from ever reaching dispatch. Only
1927        // the integration-test harness (run_subc_mode_for_test) opens this to
1928        // drive synthetic native commands through the executor.
1929        Err(_) if !allow_native_passthrough => {
1930            log::warn!(
1931                "subc tool call: rejecting non-manifest tool name {:?} on route {} (fail-closed)",
1932                call.name,
1933                frame.header.channel
1934            );
1935            let response = Response::error(
1936                request_id.clone(),
1937                "unknown_tool",
1938                format!("tool {:?} is not in the AFT tool manifest", call.name),
1939            );
1940            let response_frame = build_tool_response_frame(
1941                frame.header.ver,
1942                frame.header.channel,
1943                frame.header.corr,
1944                frame.header.flags,
1945                &bare_name,
1946                &format_context,
1947                &response,
1948            )?;
1949            return send_frame(tx, response_frame).await;
1950        }
1951        // Test-only: non-core names are native commands (the integration-test
1952        // synthetic tools) — pass them through verbatim.
1953        Err(_) => {
1954            let map = call.arguments.as_object().cloned().unwrap_or_default();
1955            (call.name.clone(), map)
1956        }
1957    };
1958
1959    let lane = command_lane(&command);
1960    let command_for_finalize = command.clone();
1961    let session_for_finalize = identity.session.clone();
1962    let mut map = translated_args;
1963    map.insert("id".to_string(), json!(request_id.clone()));
1964    map.insert("command".to_string(), json!(command));
1965    map.insert("session_id".to_string(), json!(identity.session.clone()));
1966
1967    let raw_req = match serde_json::from_value::<RawRequest>(Value::Object(map)) {
1968        Ok(req) => req,
1969        Err(error) => {
1970            let response = Response::error(
1971                request_id.clone(),
1972                "invalid_request",
1973                format!("failed to build request from tool call: {error}"),
1974            );
1975            let response_frame = build_tool_response_frame(
1976                frame.header.ver,
1977                frame.header.channel,
1978                frame.header.corr,
1979                frame.header.flags,
1980                &bare_name,
1981                &format_context,
1982                &response,
1983            )?;
1984            return send_frame(tx, response_frame).await;
1985        }
1986    };
1987
1988    let bare_name_for_frame = bare_name.clone();
1989    let format_context_for_frame = format_context;
1990    let rx = executor.submit_async(
1991        identity.root,
1992        lane,
1993        request_id.clone(),
1994        Box::new(move |ctx| {
1995            log_ctx::with_session(Some(session_for_finalize.clone()), || {
1996                let mut response = dispatch(raw_req, ctx);
1997                crate::response_finalize::attach_bg_completions(
1998                    &mut response,
1999                    ctx,
2000                    &session_for_finalize,
2001                    &command_for_finalize,
2002                );
2003                crate::response_finalize::attach_status_bar(
2004                    &mut response,
2005                    ctx,
2006                    &command_for_finalize,
2007                );
2008                response
2009            })
2010        }),
2011    );
2012    let completion_tx = tx.clone();
2013    let completion_shutdown = Arc::clone(shutdown);
2014    let route_channel = frame.header.channel;
2015    let corr = frame.header.corr;
2016    let flags = frame.header.flags;
2017    let ver = frame.header.ver;
2018    let is_mutating = lane == Lane::Mutating;
2019    tokio::spawn(async move {
2020        let response = await_executor_response(rx, request_id.clone()).await;
2021        let fatal = is_mutating && response_is_internal_error(&response);
2022        match build_tool_response_frame(
2023            ver,
2024            route_channel,
2025            corr,
2026            flags,
2027            &bare_name_for_frame,
2028            &format_context_for_frame,
2029            &response,
2030        ) {
2031            Ok(response_frame) => {
2032                let _ = completion_tx.send(response_frame).await;
2033            }
2034            Err(error) => {
2035                log::error!("subc attach: failed to build tool response frame: {error}");
2036            }
2037        }
2038        if fatal {
2039            signal_fatal_teardown(
2040                &completion_tx,
2041                Some(route_channel),
2042                ver,
2043                corr,
2044                &completion_shutdown,
2045            )
2046            .await;
2047        }
2048    });
2049    Ok(())
2050}
2051
2052fn submit_maintenance_drain(
2053    executor: &Arc<Executor>,
2054    root_id: ProjectRootId,
2055    completion_tx: &mpsc::Sender<(ProjectRootId, Response)>,
2056) {
2057    let request_id = format!(
2058        "subc-maintenance-drain-{}",
2059        root_id.as_path().to_string_lossy()
2060    );
2061    let response_id = request_id.clone();
2062    let completion_root_id = root_id.clone();
2063    let rx = executor.submit_async(
2064        root_id,
2065        Lane::Mutating,
2066        request_id.clone(),
2067        Box::new(move |ctx| {
2068            runtime_drain::drain_configure_warning_events(ctx);
2069            runtime_drain::drain_search_index_events(ctx);
2070            runtime_drain::drain_callgraph_store_events(ctx);
2071            runtime_drain::drain_semantic_index_events(ctx);
2072            runtime_drain::drain_semantic_refresh_events(ctx);
2073            runtime_drain::drain_inspect_events(ctx);
2074            runtime_drain::drain_watcher_events(ctx);
2075            runtime_drain::drain_lsp_events(ctx);
2076            Response::success(response_id, json!({ "drained": true }))
2077        }),
2078    );
2079    let completion_tx = completion_tx.clone();
2080    tokio::spawn(async move {
2081        let response = await_executor_response(rx, request_id).await;
2082        let _ = completion_tx.send((completion_root_id, response)).await;
2083    });
2084}
2085
2086async fn await_executor_response(rx: oneshot::Receiver<Response>, request_id: String) -> Response {
2087    rx.await
2088        .unwrap_or_else(|_| Response::error(request_id, "internal_error", "executor dropped"))
2089}
2090
2091fn build_tool_response_frame(
2092    ver: u8,
2093    route_channel: u16,
2094    corr: u64,
2095    flags: Flags,
2096    bare_name: &str,
2097    format_context: &crate::subc_format::FormatContext,
2098    response: &Response,
2099) -> Result<Frame, SubcError> {
2100    let text =
2101        crate::subc_format::format_response_with_context(bare_name, response, format_context);
2102    let is_error = !response.success;
2103    let result = json!({
2104        "content": [{ "type": "text", "text": text }],
2105        "isError": is_error,
2106    });
2107    let body = serde_json::to_vec(&result).map_err(SubcError::Json)?;
2108
2109    Frame::build_with_version(ver, FrameType::Response, flags, route_channel, corr, body)
2110        .map_err(SubcError::FrameBuild)
2111}
2112
2113fn build_error_frame(
2114    ver: u8,
2115    channel: u16,
2116    corr: u64,
2117    flags: Flags,
2118    code: &str,
2119    message: &str,
2120) -> Result<Frame, SubcError> {
2121    let body = serde_json::to_vec(&ErrorBody {
2122        code: code.to_string(),
2123        message: message.to_string(),
2124    })
2125    .map_err(SubcError::Json)?;
2126    Frame::build_with_version(ver, FrameType::Error, flags, channel, corr, body)
2127        .map_err(SubcError::FrameBuild)
2128}
2129
2130fn build_goodbye_frame(ver: u8, channel: u16, corr: u64) -> Result<Frame, SubcError> {
2131    Frame::build_with_version(
2132        ver,
2133        FrameType::Goodbye,
2134        control_flags(),
2135        channel,
2136        corr,
2137        Vec::new(),
2138    )
2139    .map_err(SubcError::FrameBuild)
2140}
2141
2142async fn signal_fatal_teardown(
2143    tx: &mpsc::Sender<Frame>,
2144    route_channel: Option<u16>,
2145    ver: u8,
2146    corr: u64,
2147    shutdown: &Arc<Notify>,
2148) {
2149    if let Some(route_channel) = route_channel {
2150        if let Ok(frame) = build_goodbye_frame(ver, route_channel, corr) {
2151            if let Err(error) = send_frame(tx, frame).await {
2152                log::warn!(
2153                    "subc attach: failed to queue fatal route Goodbye for route {route_channel}: {error}"
2154                );
2155            }
2156        }
2157    }
2158    if let Ok(frame) = build_goodbye_frame(ver, 0, 0) {
2159        if let Err(error) = send_frame(tx, frame).await {
2160            log::warn!("subc attach: failed to queue fatal channel-0 Goodbye: {error}");
2161        }
2162    }
2163    shutdown.notify_one();
2164}
2165
2166fn response_message(response: &Response, fallback: &str) -> String {
2167    response
2168        .data
2169        .get("message")
2170        .and_then(Value::as_str)
2171        .map(ToOwned::to_owned)
2172        .unwrap_or_else(|| fallback.to_string())
2173}
2174
2175fn response_is_internal_error(response: &Response) -> bool {
2176    !response.success && response.data.get("code").and_then(Value::as_str) == Some("internal_error")
2177}
2178
2179fn is_subc_agent_core_tool(name: &str) -> bool {
2180    matches!(
2181        name,
2182        "status" | "read" | "write" | "edit" | "grep" | "search" | "outline" | "inspect"
2183    )
2184}
2185
2186fn command_lane(command: &str) -> Lane {
2187    match command {
2188        "ping"
2189        | "version"
2190        | "echo"
2191        | "bash_drain_completions"
2192        | "bash_regex_match"
2193        | "db_get_state"
2194        | "db_get_host_state"
2195        | "read"
2196        | "undo_preview"
2197        | "edit_history"
2198        | "checkpoint_paths"
2199        | "list_checkpoints"
2200        | "glob"
2201        | "grep"
2202        | "git_conflicts"
2203        | "ast_search" => Lane::PureRead,
2204
2205        // Lazy reads mutate parser/terminal/url caches on a miss, but are still
2206        // classified onto the reader pool; install races are handled at the
2207        // individual cache sites.
2208        "bash_status" | "outline" | "zoom" => Lane::PureRead,
2209
2210        "status"
2211        | "inspect"
2212        | "lsp_diagnostics"
2213        | "lsp_inspect"
2214        | "lsp_hover"
2215        | "lsp_goto_definition"
2216        | "lsp_find_references"
2217        | "lsp_prepare_rename" => Lane::SerialLspStatus,
2218
2219        "semantic_search" | "search" | "callers" | "impact" | "call_tree" | "trace_to"
2220        | "trace_to_symbol" | "trace_data" | "inspect_tier2_run" => Lane::HeavyInit,
2221
2222        "bash"
2223        | "bash_ack_completions"
2224        | "bash_notify"
2225        | "bash_unnotify"
2226        | "bash_promote"
2227        | "bash_kill"
2228        | "bash_write"
2229        | "db_set_state"
2230        | "db_set_host_state"
2231        | "undo"
2232        | "checkpoint"
2233        | "restore_checkpoint"
2234        | "write"
2235        | "delete_file"
2236        | "move_file"
2237        | "edit"
2238        | "edit_symbol"
2239        | "edit_match"
2240        | "batch"
2241        | "add_import"
2242        | "remove_import"
2243        | "organize_imports"
2244        | "configure"
2245        | "move_symbol"
2246        | "extract_function"
2247        | "inline_symbol"
2248        | "ast_replace"
2249        | "lsp_rename"
2250        | "list_filters"
2251        | "trust_filter_project"
2252        | "untrust_filter_project"
2253        | "snapshot" => Lane::Mutating,
2254
2255        _ => Lane::Mutating,
2256    }
2257}
2258
2259#[derive(Debug, Deserialize)]
2260struct ToolCallRequest {
2261    name: String,
2262    #[serde(default)]
2263    arguments: Value,
2264}
2265
2266static SUBC_TOOL_SCHEMAS: LazyLock<serde_json::Map<String, Value>> = LazyLock::new(|| {
2267    serde_json::from_str(include_str!("subc_tool_schemas.json"))
2268        .unwrap_or_else(|e| panic!("subc_tool_schemas.json: {e}"))
2269});
2270
2271fn tool_schema(name: &str) -> Value {
2272    SUBC_TOOL_SCHEMAS.get(name).cloned().unwrap_or_else(|| {
2273        log::warn!(
2274            "subc build_manifest: missing embedded schema for tool {name:?}; using placeholder"
2275        );
2276        json!({ "type": "object" })
2277    })
2278}
2279
2280/// AFT's subc-mode capability manifest. BARE tool names (the gateway owns the
2281/// `aft_` prefix); ModuleManaged concurrency (AFT schedules internally);
2282/// FirstParty trust. Minimal-but-conformant tool set for the spike — the full
2283/// bare set is locked before the gateway fronts AFT.
2284fn build_manifest() -> ModuleManifest {
2285    let tool = |name: &str, execution_mode: ExecutionMode| Tool {
2286        name: name.to_string(),
2287        execution_mode,
2288        schema: tool_schema(name),
2289    };
2290    // execution_mode keys on externally-observable side effects, NOT internal
2291    // ctx mutation: the readers warm AFT's own index/cache/symbol artifacts
2292    // (internal), not the user's workspace, so they are Pure. Only edit/write
2293    // produce observable file writes -> Mutating. There is no bash tool in this
2294    // core, so Unfenceable is unused here (it stays in the enum for bash/other
2295    // modules).
2296    ModuleManifest {
2297        module_id: "aft".to_string(),
2298        module_version: env!("CARGO_PKG_VERSION").to_string(),
2299        protocol_ver: PROTOCOL_VERSION,
2300        trust_tier: TrustTier::FirstParty,
2301        provides: vec![ProviderRole::ToolProvider {
2302            tools: vec![
2303                tool("status", ExecutionMode::Pure),
2304                tool("read", ExecutionMode::Pure),
2305                tool("grep", ExecutionMode::Pure),
2306                tool("search", ExecutionMode::Pure),
2307                tool("outline", ExecutionMode::Pure),
2308                tool("inspect", ExecutionMode::Pure),
2309                tool("edit", ExecutionMode::Mutating),
2310                tool("write", ExecutionMode::Mutating),
2311            ],
2312            identity_scope: vec![IdentityScope::Session, IdentityScope::Project],
2313            concurrency: Concurrency::ModuleManaged,
2314            emits_push: true,
2315            sub_supervises: true,
2316        }],
2317        consumes: Vec::new(),
2318        scheduled_tasks: Vec::new(),
2319        bindings: Bindings {
2320            storage: StorageBinding {
2321                kind: StorageKind::Sqlite,
2322                scope: StorageScope::Project,
2323                owns_schema: true,
2324            },
2325            config: ConfigBinding {
2326                source: ConfigSource::SubcMediated,
2327                tiers: vec!["user".to_string(), "project".to_string()],
2328                expansion: std::collections::BTreeMap::new(),
2329            },
2330            vault_grants: Vec::new(),
2331            identity: IdentityBinding {
2332                requires: vec![IdentityScope::Project],
2333                optional: vec![IdentityScope::Session],
2334            },
2335        },
2336    }
2337}
2338
2339fn control_flags() -> Flags {
2340    Flags::new(false, Priority::Passive, false)
2341}
2342
2343#[cfg(test)]
2344mod tests {
2345    use super::*;
2346    use crate::bash_background::BgTaskStatus;
2347    use crate::protocol::{
2348        BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, ConfigureWarningsFrame,
2349        ProgressFrame, StatusChangedFrame,
2350    };
2351    use serde_json::json;
2352
2353    fn test_root(name: &str) -> (tempfile::TempDir, ProjectRootId) {
2354        let dir = tempfile::Builder::new()
2355            .prefix(name)
2356            .tempdir()
2357            .expect("temp root");
2358        let root = ProjectRootId::from_path(dir.path()).expect("project root id");
2359        (dir, root)
2360    }
2361
2362    fn status_frame(seq: u64) -> PushFrame {
2363        status_frame_with_session(seq, None)
2364    }
2365
2366    fn status_frame_with_session(seq: u64, session_id: Option<&str>) -> PushFrame {
2367        PushFrame::StatusChanged(StatusChangedFrame {
2368            frame_type: "status_changed",
2369            session_id: session_id.map(str::to_string),
2370            snapshot: json!({ "seq": seq }),
2371        })
2372    }
2373
2374    fn completion_frame(task_id: &str) -> PushFrame {
2375        completion_frame_with_session(task_id, "session-1")
2376    }
2377
2378    fn completion_frame_with_session(task_id: &str, session_id: &str) -> PushFrame {
2379        PushFrame::BashCompleted(BashCompletedFrame {
2380            frame_type: "bash_completed",
2381            task_id: task_id.to_string(),
2382            session_id: session_id.to_string(),
2383            status: BgTaskStatus::Completed,
2384            exit_code: Some(0),
2385            command: format!("echo {task_id}"),
2386            output_preview: String::new(),
2387            output_truncated: false,
2388            original_tokens: None,
2389            compressed_tokens: None,
2390            tokens_skipped: false,
2391        })
2392    }
2393
2394    fn long_running_frame(task_id: &str, elapsed_ms: u64) -> PushFrame {
2395        long_running_frame_with_session(task_id, "session-1", elapsed_ms)
2396    }
2397
2398    fn long_running_frame_with_session(
2399        task_id: &str,
2400        session_id: &str,
2401        elapsed_ms: u64,
2402    ) -> PushFrame {
2403        PushFrame::BashLongRunning(BashLongRunningFrame {
2404            frame_type: "bash_long_running",
2405            task_id: task_id.to_string(),
2406            session_id: session_id.to_string(),
2407            command: format!("sleep {elapsed_ms}"),
2408            elapsed_ms,
2409        })
2410    }
2411
2412    fn pattern_match_frame(session_id: &str) -> PushFrame {
2413        PushFrame::BashPatternMatch(BashPatternMatchFrame {
2414            frame_type: "bash_pattern_match",
2415            task_id: "task-pattern".to_string(),
2416            session_id: session_id.to_string(),
2417            watch_id: "watch-1".to_string(),
2418            match_text: "needle".to_string(),
2419            match_offset: 7,
2420            context: "haystack needle".to_string(),
2421            once: true,
2422            reason: "pattern_match",
2423        })
2424    }
2425
2426    fn configure_warnings_frame(session_id: Option<&str>) -> PushFrame {
2427        PushFrame::ConfigureWarnings(ConfigureWarningsFrame {
2428            frame_type: "configure_warnings",
2429            session_id: session_id.map(str::to_string),
2430            project_root: "/tmp/subc-test".to_string(),
2431            source_file_count: 0,
2432            source_file_count_exceeds_max: false,
2433            max_callgraph_files: 0,
2434            warnings: Vec::new(),
2435        })
2436    }
2437
2438    fn route_identity(root: &ProjectRootId, session_id: &str) -> RouteIdentity {
2439        RouteIdentity {
2440            root: root.clone(),
2441            project_root: root.as_path().to_path_buf(),
2442            harness: "opencode".to_string(),
2443            session: session_id.to_string(),
2444        }
2445    }
2446
2447    fn progress_frame(request_id: &str, kind: ProgressKind, chunk: &str) -> PushFrame {
2448        PushFrame::Progress(ProgressFrame::new(request_id, kind, chunk))
2449    }
2450
2451    fn status_seq(frame: &PushFrame) -> Option<u64> {
2452        match frame {
2453            PushFrame::StatusChanged(status) => status.snapshot.get("seq").and_then(|v| v.as_u64()),
2454            _ => None,
2455        }
2456    }
2457
2458    fn completion_task(frame: &PushFrame) -> Option<&str> {
2459        match frame {
2460            PushFrame::BashCompleted(completion) => Some(completion.task_id.as_str()),
2461            _ => None,
2462        }
2463    }
2464
2465    fn push_frame_task_id(frame: &Frame) -> Option<String> {
2466        let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2467        body.get("task_id")
2468            .and_then(serde_json::Value::as_str)
2469            .map(str::to_string)
2470    }
2471
2472    #[test]
2473    fn frame_classification_matches_push_delivery_contract() {
2474        let completion = completion_frame_with_session("done", "session-a");
2475        assert_eq!(frame_session(&completion), Some("session-a"));
2476        assert!(frame_is_reliable(&completion));
2477
2478        let long_running = long_running_frame_with_session("long", "session-b", 42);
2479        assert_eq!(frame_session(&long_running), Some("session-b"));
2480        assert!(!frame_is_reliable(&long_running));
2481
2482        let pattern_match = pattern_match_frame("session-c");
2483        assert_eq!(frame_session(&pattern_match), Some("session-c"));
2484        assert!(frame_is_reliable(&pattern_match));
2485
2486        let tagged_warnings = configure_warnings_frame(Some("session-d"));
2487        assert_eq!(frame_session(&tagged_warnings), Some("session-d"));
2488        assert!(frame_is_reliable(&tagged_warnings));
2489
2490        let untagged_warnings = configure_warnings_frame(None);
2491        assert_eq!(frame_session(&untagged_warnings), None);
2492        assert!(frame_is_reliable(&untagged_warnings));
2493
2494        let tagged_status = status_frame_with_session(1, Some("session-e"));
2495        assert_eq!(frame_session(&tagged_status), Some("session-e"));
2496        assert!(!frame_is_reliable(&tagged_status));
2497
2498        let project_status = status_frame(2);
2499        assert_eq!(frame_session(&project_status), None);
2500        assert!(!frame_is_reliable(&project_status));
2501
2502        let progress = progress_frame("request-1", ProgressKind::Stdout, "chunk");
2503        assert_eq!(frame_session(&progress), None);
2504        assert!(!frame_is_reliable(&progress));
2505    }
2506
2507    #[test]
2508    fn fan_out_push_frame_routes_session_scoped_and_project_scoped_frames() {
2509        let (_root_dir, root) = test_root("subc-session-routing-root");
2510        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(8);
2511        let identity1 = route_identity(&root, "session-1");
2512        let identity2 = route_identity(&root, "session-2");
2513        let mut routes = HashMap::new();
2514        routes.insert(route_key(1), identity1.clone());
2515        routes.insert(route_key(2), identity2.clone());
2516        let mut root_channels = HashMap::new();
2517        root_channels.insert(root.clone(), HashSet::from([route_key(1), route_key(2)]));
2518        let mut session_identity = HashMap::new();
2519        remember_session_identity(&mut session_identity, &identity1);
2520        remember_session_identity(&mut session_identity, &identity2);
2521        let mut retry_buffer = HashMap::new();
2522        let mut push_buffer = HashMap::new();
2523
2524        let session_result = fan_out_reliable_push_frame(
2525            &writer_tx,
2526            &routes,
2527            &root_channels,
2528            &session_identity,
2529            &mut retry_buffer,
2530            &mut push_buffer,
2531            &root,
2532            &completion_frame_with_session("session-only", "session-1"),
2533        );
2534        assert_eq!(
2535            session_result,
2536            FanOutResult {
2537                matched_channels: 1,
2538                sent_frames: 1,
2539            }
2540        );
2541        assert!(retry_buffer.is_empty());
2542        assert!(push_buffer.is_empty());
2543        let session_push = writer_rx.try_recv().expect("session push queued");
2544        assert_eq!(session_push.header.ty, FrameType::Push);
2545        assert_eq!(session_push.header.channel, 1);
2546        assert!(
2547            writer_rx.try_recv().is_err(),
2548            "session-scoped frame must not broadcast to sibling sessions"
2549        );
2550
2551        let project_result =
2552            fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(9));
2553        assert_eq!(
2554            project_result,
2555            FanOutResult {
2556                matched_channels: 2,
2557                sent_frames: 2,
2558            }
2559        );
2560        let project_channels: HashSet<_> = [
2561            writer_rx
2562                .try_recv()
2563                .expect("first project push")
2564                .header
2565                .channel,
2566            writer_rx
2567                .try_recv()
2568                .expect("second project push")
2569                .header
2570                .channel,
2571        ]
2572        .into_iter()
2573        .collect();
2574        assert_eq!(project_channels, HashSet::from([1, 2]));
2575        assert!(writer_rx.try_recv().is_err());
2576    }
2577
2578    #[test]
2579    fn push_buffer_drops_oldest_per_replay_key() {
2580        let (_root_dir, root) = test_root("subc-buffer-bound-root");
2581        let key = ReplayKey {
2582            root,
2583            harness: "opencode".to_string(),
2584            session: "session-1".to_string(),
2585        };
2586        let mut push_buffer = HashMap::new();
2587        let total = PUSH_BUFFER_MAX_PER_KEY + 3;
2588
2589        for index in 0..total {
2590            buffer_push_frame(
2591                &mut push_buffer,
2592                key.clone(),
2593                completion_frame(&format!("task-{index}")),
2594            );
2595        }
2596
2597        let buffered = push_buffer.get(&key).expect("buffer entry");
2598        assert_eq!(buffered.len(), PUSH_BUFFER_MAX_PER_KEY);
2599        let tasks: Vec<String> = buffered
2600            .iter()
2601            .filter_map(completion_task)
2602            .map(str::to_string)
2603            .collect();
2604        assert_eq!(tasks.first().map(String::as_str), Some("task-3"));
2605        assert_eq!(
2606            tasks.last().map(String::as_str),
2607            Some(format!("task-{}", total - 1).as_str())
2608        );
2609    }
2610
2611    #[test]
2612    fn replay_buffered_push_frames_drains_to_bound_channel() {
2613        let (_root_dir, root) = test_root("subc-buffer-replay-root");
2614        let key = ReplayKey {
2615            root,
2616            harness: "opencode".to_string(),
2617            session: "session-1".to_string(),
2618        };
2619        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
2620        let mut push_buffer = HashMap::new();
2621        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
2622        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-b"));
2623
2624        let replayed =
2625            replay_buffered_push_frames(&writer_tx, route_key(3), &mut push_buffer, &key);
2626
2627        assert_eq!(replayed, 2);
2628        assert!(!push_buffer.contains_key(&key));
2629        for expected_task in ["task-a", "task-b"] {
2630            let frame = writer_rx.try_recv().expect("replayed push");
2631            assert_eq!(frame.header.ty, FrameType::Push);
2632            assert_eq!(frame.header.channel, 3);
2633            let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2634            assert_eq!(body["task_id"].as_str(), Some(expected_task));
2635        }
2636        assert!(writer_rx.try_recv().is_err());
2637    }
2638
2639    #[test]
2640    fn coalesce_push_batch_collapses_lossy_and_preserves_reliable_fifo() {
2641        let (_root_dir, root) = test_root("subc-coalesce-root");
2642        let (_other_dir, other_root) = test_root("subc-coalesce-other");
2643
2644        let output = coalesce_push_batch(vec![
2645            (root.clone(), status_frame(1)),
2646            (root.clone(), completion_frame("task-1")),
2647            (root.clone(), status_frame(2)),
2648            (root.clone(), completion_frame("task-2")),
2649            (root.clone(), long_running_frame("long-task", 100)),
2650            (root.clone(), long_running_frame("long-task", 200)),
2651            (other_root.clone(), status_frame(9)),
2652        ]);
2653
2654        let completion_tasks: Vec<_> = output
2655            .iter()
2656            .filter_map(|(_, frame)| completion_task(frame))
2657            .collect();
2658        assert_eq!(completion_tasks, vec!["task-1", "task-2"]);
2659
2660        let root_statuses: Vec<_> = output
2661            .iter()
2662            .filter(|(output_root, _)| output_root == &root)
2663            .filter_map(|(_, frame)| status_seq(frame))
2664            .collect();
2665        assert_eq!(root_statuses, vec![2]);
2666
2667        let other_statuses: Vec<_> = output
2668            .iter()
2669            .filter(|(output_root, _)| output_root == &other_root)
2670            .filter_map(|(_, frame)| status_seq(frame))
2671            .collect();
2672        assert_eq!(other_statuses, vec![9]);
2673
2674        let long_running_elapsed: Vec<_> = output
2675            .iter()
2676            .filter_map(|(_, frame)| match frame {
2677                PushFrame::BashLongRunning(long_running) => Some(long_running.elapsed_ms),
2678                _ => None,
2679            })
2680            .collect();
2681        assert_eq!(long_running_elapsed, vec![200]);
2682    }
2683
2684    #[test]
2685    fn coalesce_push_batch_keeps_progress_stream_keys_separate() {
2686        let (_root_dir, root) = test_root("subc-progress-coalesce-root");
2687
2688        let output = coalesce_push_batch(vec![
2689            (
2690                root.clone(),
2691                progress_frame("request-1", ProgressKind::Stdout, "old stdout"),
2692            ),
2693            (
2694                root.clone(),
2695                progress_frame("request-1", ProgressKind::Stderr, "stderr"),
2696            ),
2697            (
2698                root.clone(),
2699                progress_frame("request-2", ProgressKind::Stdout, "other stdout"),
2700            ),
2701            (
2702                root.clone(),
2703                progress_frame("request-1", ProgressKind::Stdout, "new stdout"),
2704            ),
2705        ]);
2706
2707        let progress: Vec<_> = output
2708            .iter()
2709            .filter_map(|(_, frame)| match frame {
2710                PushFrame::Progress(progress) => Some((
2711                    progress.request_id.as_str(),
2712                    match progress.kind {
2713                        ProgressKind::Stdout => "stdout",
2714                        ProgressKind::Stderr => "stderr",
2715                    },
2716                    progress.chunk.as_str(),
2717                )),
2718                _ => None,
2719            })
2720            .collect();
2721
2722        assert_eq!(
2723            progress,
2724            vec![
2725                ("request-1", "stderr", "stderr"),
2726                ("request-2", "stdout", "other stdout"),
2727                ("request-1", "stdout", "new stdout"),
2728            ]
2729        );
2730    }
2731
2732    #[test]
2733    fn progress_sender_keeps_reliable_off_saturated_lossy_funnel_without_blocking() {
2734        let (_root_dir, root) = test_root("subc-push-full-root");
2735        let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1);
2736        let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
2737        let sender = progress_sender_for_root(
2738            PushSenders {
2739                lossy_tx,
2740                reliable_tx,
2741            },
2742            root.clone(),
2743        );
2744
2745        let started = Instant::now();
2746        sender(status_frame(1));
2747        sender(status_frame(2));
2748        sender(completion_frame("reliable-after-lossy-full"));
2749        assert!(
2750            started.elapsed() < Duration::from_millis(50),
2751            "saturated push sender must return immediately"
2752        );
2753
2754        let (received_root, received_frame) =
2755            lossy_rx.try_recv().expect("first lossy frame queued");
2756        assert_eq!(received_root, root);
2757        assert_eq!(status_seq(&received_frame), Some(1));
2758        assert!(
2759            lossy_rx.try_recv().is_err(),
2760            "second lossy frame should be dropped"
2761        );
2762
2763        let (reliable_root, reliable_frame) = reliable_rx
2764            .try_recv()
2765            .expect("reliable frame bypasses lossy backpressure");
2766        assert_eq!(reliable_root, root);
2767        assert_eq!(
2768            completion_task(&reliable_frame),
2769            Some("reliable-after-lossy-full")
2770        );
2771        assert!(reliable_rx.try_recv().is_err());
2772    }
2773
2774    #[test]
2775    fn fan_out_lossy_push_frame_drops_when_writer_is_full_without_blocking() {
2776        let (_root_dir, root) = test_root("subc-writer-full-root");
2777        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2778        writer_tx
2779            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2780            .expect("prefill writer queue");
2781
2782        let mut root_channels = HashMap::new();
2783        root_channels.insert(root.clone(), HashSet::from([route_key(7)]));
2784
2785        let routes = HashMap::new();
2786        let started = Instant::now();
2787        let result =
2788            fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(1));
2789        assert!(
2790            started.elapsed() < Duration::from_millis(50),
2791            "saturated writer fan-out must return immediately"
2792        );
2793        assert_eq!(
2794            result,
2795            FanOutResult {
2796                matched_channels: 1,
2797                sent_frames: 0,
2798            }
2799        );
2800
2801        let queued = writer_rx
2802            .try_recv()
2803            .expect("prefilled frame remains queued");
2804        assert_eq!(queued.header.ty, FrameType::Ping);
2805        assert!(
2806            writer_rx.try_recv().is_err(),
2807            "push should be dropped on full writer"
2808        );
2809    }
2810
2811    #[test]
2812    fn reliable_push_backpressure_buffers_and_retries_on_tick() {
2813        let (_root_dir, root) = test_root("subc-retry-buffer-root");
2814        let identity = route_identity(&root, "session-1");
2815        let key = ReplayKey::from_identity(&identity);
2816        let mut routes = HashMap::new();
2817        routes.insert(route_key(9), identity.clone());
2818        let mut root_channels = HashMap::new();
2819        root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2820        let mut session_identity = HashMap::new();
2821        remember_session_identity(&mut session_identity, &identity);
2822        let mut retry_buffer = HashMap::new();
2823        let mut push_buffer = HashMap::new();
2824        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2825        writer_tx
2826            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2827            .expect("prefill writer queue");
2828
2829        let result = fan_out_reliable_push_frame(
2830            &writer_tx,
2831            &routes,
2832            &root_channels,
2833            &session_identity,
2834            &mut retry_buffer,
2835            &mut push_buffer,
2836            &root,
2837            &completion_frame("retry-task"),
2838        );
2839
2840        assert_eq!(
2841            result,
2842            FanOutResult {
2843                matched_channels: 1,
2844                sent_frames: 0,
2845            }
2846        );
2847        assert!(push_buffer.is_empty());
2848        assert_eq!(retry_buffer.get(&route_key(9)).map(VecDeque::len), Some(1));
2849        assert_eq!(&retry_buffer[&route_key(9)][0].0, &key);
2850
2851        let queued = writer_rx.try_recv().expect("prefilled frame");
2852        assert_eq!(queued.header.ty, FrameType::Ping);
2853        assert_eq!(
2854            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2855            1
2856        );
2857        let retried = writer_rx.try_recv().expect("retried reliable push");
2858        assert_eq!(retried.header.ty, FrameType::Push);
2859        assert_eq!(retried.header.channel, 9);
2860        assert_eq!(push_frame_task_id(&retried).as_deref(), Some("retry-task"));
2861        assert!(!retry_buffer.contains_key(&route_key(9)));
2862    }
2863
2864    #[test]
2865    fn reliable_push_fifo_gates_new_frames_behind_retry_buffer() {
2866        let (_root_dir, root) = test_root("subc-retry-fifo-root");
2867        let identity = route_identity(&root, "session-1");
2868        let mut routes = HashMap::new();
2869        routes.insert(route_key(9), identity.clone());
2870        let mut root_channels = HashMap::new();
2871        root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2872        let mut session_identity = HashMap::new();
2873        remember_session_identity(&mut session_identity, &identity);
2874        let mut retry_buffer = HashMap::new();
2875        let mut push_buffer = HashMap::new();
2876        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2877        writer_tx
2878            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2879            .expect("prefill writer queue");
2880
2881        let first = completion_frame("fifo-1");
2882        let second = completion_frame("fifo-2");
2883        let _ = fan_out_reliable_push_frame(
2884            &writer_tx,
2885            &routes,
2886            &root_channels,
2887            &session_identity,
2888            &mut retry_buffer,
2889            &mut push_buffer,
2890            &root,
2891            &first,
2892        );
2893        let queued = writer_rx.try_recv().expect("free writer capacity");
2894        assert_eq!(queued.header.ty, FrameType::Ping);
2895
2896        let _ = fan_out_reliable_push_frame(
2897            &writer_tx,
2898            &routes,
2899            &root_channels,
2900            &session_identity,
2901            &mut retry_buffer,
2902            &mut push_buffer,
2903            &root,
2904            &second,
2905        );
2906        assert!(
2907            writer_rx.try_recv().is_err(),
2908            "second reliable frame must not bypass pending retry frame"
2909        );
2910        let queued_tasks: Vec<_> = retry_buffer[&route_key(9)]
2911            .iter()
2912            .filter_map(|(_, frame)| completion_task(frame))
2913            .collect();
2914        assert_eq!(queued_tasks, vec!["fifo-1", "fifo-2"]);
2915
2916        assert_eq!(
2917            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2918            1
2919        );
2920        let first_sent = writer_rx.try_recv().expect("first reliable push");
2921        assert_eq!(push_frame_task_id(&first_sent).as_deref(), Some("fifo-1"));
2922        assert_eq!(
2923            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2924            1
2925        );
2926        let second_sent = writer_rx.try_recv().expect("second reliable push");
2927        assert_eq!(push_frame_task_id(&second_sent).as_deref(), Some("fifo-2"));
2928        assert!(!retry_buffer.contains_key(&route_key(9)));
2929    }
2930
2931    #[test]
2932    fn replay_buffered_push_frames_drains_incrementally_on_backpressure() {
2933        let (_root_dir, root) = test_root("subc-incremental-replay-root");
2934        let key = ReplayKey {
2935            root,
2936            harness: "opencode".to_string(),
2937            session: "session-1".to_string(),
2938        };
2939        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(2);
2940        writer_tx
2941            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2942            .expect("prefill writer queue");
2943        let mut push_buffer = HashMap::new();
2944        for task in ["replay-1", "replay-2", "replay-3"] {
2945            buffer_push_frame(&mut push_buffer, key.clone(), completion_frame(task));
2946        }
2947
2948        assert_eq!(
2949            replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2950            1
2951        );
2952        assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(2));
2953        let remaining: Vec<_> = push_buffer[&key]
2954            .iter()
2955            .filter_map(completion_task)
2956            .collect();
2957        assert_eq!(remaining, vec!["replay-2", "replay-3"]);
2958
2959        let queued = writer_rx.try_recv().expect("prefilled frame");
2960        assert_eq!(queued.header.ty, FrameType::Ping);
2961        let first = writer_rx.try_recv().expect("first replayed push");
2962        assert_eq!(push_frame_task_id(&first).as_deref(), Some("replay-1"));
2963
2964        assert_eq!(
2965            replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2966            2
2967        );
2968        let second = writer_rx.try_recv().expect("second replayed push");
2969        let third = writer_rx.try_recv().expect("third replayed push");
2970        assert_eq!(push_frame_task_id(&second).as_deref(), Some("replay-2"));
2971        assert_eq!(push_frame_task_id(&third).as_deref(), Some("replay-3"));
2972        assert!(!push_buffer.contains_key(&key));
2973    }
2974
2975    #[test]
2976    fn goodbye_migrates_retry_buffer_into_detach_replay() {
2977        let (_root_dir, root) = test_root("subc-goodbye-migration-root");
2978        let key = ReplayKey {
2979            root,
2980            harness: "opencode".to_string(),
2981            session: "session-1".to_string(),
2982        };
2983        let mut retry_buffer = HashMap::new();
2984        buffer_retry_frame(
2985            &mut retry_buffer,
2986            route_key(5),
2987            key.clone(),
2988            completion_frame("migrated-task"),
2989        );
2990        let mut push_buffer = HashMap::new();
2991
2992        assert_eq!(
2993            migrate_retry_buffer_to_push_buffer(&mut retry_buffer, route_key(5), &mut push_buffer),
2994            1
2995        );
2996
2997        assert!(!retry_buffer.contains_key(&route_key(5)));
2998        assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(1));
2999        assert_eq!(
3000            completion_task(&push_buffer[&key][0]),
3001            Some("migrated-task")
3002        );
3003    }
3004
3005    #[test]
3006    fn permanent_push_send_failure_is_dropped_not_retried_forever() {
3007        let (_root_dir, root) = test_root("subc-permanent-failure-root");
3008        let key = ReplayKey {
3009            root,
3010            harness: "opencode".to_string(),
3011            session: "session-1".to_string(),
3012        };
3013        let (writer_tx, writer_rx) = mpsc::channel::<Frame>(1);
3014        drop(writer_rx);
3015
3016        let mut push_buffer = HashMap::new();
3017        buffer_push_frame(
3018            &mut push_buffer,
3019            key.clone(),
3020            completion_frame("closed-replay"),
3021        );
3022        assert_eq!(
3023            replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
3024            0
3025        );
3026        assert!(!push_buffer.contains_key(&key));
3027
3028        let mut retry_buffer = HashMap::new();
3029        buffer_retry_frame(
3030            &mut retry_buffer,
3031            route_key(4),
3032            key,
3033            completion_frame("closed-retry"),
3034        );
3035        assert_eq!(
3036            drain_retry_buffer_for_channel(&writer_tx, route_key(4), &mut retry_buffer),
3037            0
3038        );
3039        assert!(!retry_buffer.contains_key(&route_key(4)));
3040    }
3041
3042    #[test]
3043    fn completed_task_suppresses_stale_long_running_lossy_push() {
3044        let mut completed_tasks = CompletedTaskIds::default();
3045        assert!(!should_drop_lossy_push(
3046            &completed_tasks,
3047            &long_running_frame("stale-task", 100)
3048        ));
3049
3050        completed_tasks.remember("stale-task");
3051
3052        assert!(should_drop_lossy_push(
3053            &completed_tasks,
3054            &long_running_frame("stale-task", 200)
3055        ));
3056        assert!(!should_drop_lossy_push(
3057            &completed_tasks,
3058            &long_running_frame("other-task", 200)
3059        ));
3060    }
3061
3062    #[tokio::test]
3063    async fn control_send_times_out_when_writer_queue_remains_full() {
3064        let (writer_tx, _writer_rx) = mpsc::channel::<Frame>(1);
3065        writer_tx
3066            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
3067            .expect("prefill writer queue");
3068        let started = Instant::now();
3069
3070        let result = send_frame(
3071            &writer_tx,
3072            Frame::build(FrameType::Pong, control_flags(), 0, 2, Vec::new()).unwrap(),
3073        )
3074        .await;
3075
3076        assert!(matches!(result, Err(SubcError::WriterBackpressureTimeout)));
3077        assert!(
3078            started.elapsed() < Duration::from_secs(2),
3079            "control send guard should be bounded"
3080        );
3081    }
3082
3083    const CORE_TOOLS: [&str; 8] = [
3084        "status", "read", "grep", "search", "outline", "inspect", "edit", "write",
3085    ];
3086
3087    fn is_bare_placeholder_schema(schema: &Value) -> bool {
3088        schema == &json!({ "type": "object" })
3089    }
3090
3091    #[test]
3092    fn build_manifest_serves_embedded_tool_schemas() {
3093        let manifest = build_manifest();
3094        let tools = match manifest.provides.first() {
3095            Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3096            _ => panic!("expected ToolProvider"),
3097        };
3098        let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3099        for name in CORE_TOOLS {
3100            let tool = by_name
3101                .get(name)
3102                .unwrap_or_else(|| panic!("missing tool {name}"));
3103            assert!(
3104                !is_bare_placeholder_schema(&tool.schema),
3105                "{name} must not use bare placeholder schema"
3106            );
3107            assert_eq!(
3108                tool.schema.get("type").and_then(|v| v.as_str()),
3109                Some("object"),
3110                "{name} schema must be an object"
3111            );
3112        }
3113
3114        let read = by_name["read"]
3115            .schema
3116            .get("properties")
3117            .and_then(|p| p.as_object());
3118        let read_props = read.expect("read schema properties");
3119        assert!(
3120            read_props.contains_key("filePath"),
3121            "read schema must expose filePath"
3122        );
3123
3124        let status = &by_name["status"].schema;
3125        assert_eq!(
3126            status.get("properties").and_then(|v| v.as_object()),
3127            Some(&serde_json::Map::new()),
3128            "status schema must have empty properties"
3129        );
3130        assert_eq!(
3131            status.get("additionalProperties").and_then(|v| v.as_bool()),
3132            Some(false),
3133            "status schema must forbid additionalProperties"
3134        );
3135    }
3136
3137    #[test]
3138    fn build_manifest_classifies_execution_mode_by_observable_effect() {
3139        let manifest = build_manifest();
3140        let tools = match manifest.provides.first() {
3141            Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3142            _ => panic!("expected ToolProvider"),
3143        };
3144        let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3145
3146        // Readers warm AFT's own index/cache/symbol artifacts (internal ctx
3147        // mutation), not the user's observable workspace, so they are Pure.
3148        for name in ["status", "read", "grep", "search", "outline", "inspect"] {
3149            assert_eq!(
3150                by_name[name].execution_mode,
3151                ExecutionMode::Pure,
3152                "{name} produces no observable side effect and must be Pure"
3153            );
3154        }
3155        // Only edit/write produce observable file writes -> Mutating.
3156        for name in ["edit", "write"] {
3157            assert_eq!(
3158                by_name[name].execution_mode,
3159                ExecutionMode::Mutating,
3160                "{name} writes files and must be Mutating"
3161            );
3162        }
3163    }
3164}
3165
3166#[derive(Debug)]
3167pub enum SubcError {
3168    Runtime(std::io::Error),
3169    ConnectionFile {
3170        path: PathBuf,
3171        source: subc_transport::ConnectionFileError,
3172    },
3173    NoEndpoint {
3174        path: PathBuf,
3175    },
3176    InvalidEndpoint {
3177        path: PathBuf,
3178        endpoint: String,
3179    },
3180    Connect {
3181        endpoint: String,
3182        source: std::io::Error,
3183    },
3184    Auth {
3185        endpoint: String,
3186        source: subc_transport::AuthError,
3187    },
3188    FrameIo(subc_transport::FrameIoError),
3189    FrameBuild(subc_protocol::FrameBuildError),
3190    WriterClosed,
3191    WriterBackpressureTimeout,
3192    WriterJoin(tokio::task::JoinError),
3193    Json(serde_json::Error),
3194    ClosedBeforeHelloAck,
3195    HelloRejected {
3196        body: Option<ErrorBody>,
3197    },
3198    UnexpectedFrame {
3199        ty: FrameType,
3200    },
3201}
3202
3203impl fmt::Display for SubcError {
3204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3205        match self {
3206            Self::Runtime(e) => write!(f, "failed to build subc tokio runtime: {e}"),
3207            Self::ConnectionFile { path, source } => {
3208                write!(f, "failed to read subc connection file {path:?}: {source}")
3209            }
3210            Self::NoEndpoint { path } => {
3211                write!(f, "subc connection file {path:?} has no endpoints")
3212            }
3213            Self::InvalidEndpoint { path, endpoint } => {
3214                write!(
3215                    f,
3216                    "subc connection file {path:?} has invalid endpoint {endpoint}"
3217                )
3218            }
3219            Self::Connect { endpoint, source } => {
3220                write!(f, "failed to connect to subc endpoint {endpoint}: {source}")
3221            }
3222            Self::Auth { endpoint, source } => {
3223                write!(
3224                    f,
3225                    "failed to authenticate to subc endpoint {endpoint}: {source}"
3226                )
3227            }
3228            Self::FrameIo(e) => write!(f, "subc frame I/O error: {e}"),
3229            Self::FrameBuild(e) => write!(f, "subc frame build error: {e}"),
3230            Self::WriterClosed => write!(f, "subc writer task closed"),
3231            Self::WriterBackpressureTimeout => write!(
3232                f,
3233                "subc writer task stayed backpressured while sending a control frame"
3234            ),
3235            Self::WriterJoin(e) => write!(f, "subc writer task join error: {e}"),
3236            Self::Json(e) => write!(f, "subc JSON error: {e}"),
3237            Self::ClosedBeforeHelloAck => {
3238                write!(f, "subc daemon closed the connection before HelloAck")
3239            }
3240            Self::HelloRejected { body } => match body {
3241                Some(b) => write!(f, "subc rejected ModuleHello: {} ({})", b.code, b.message),
3242                None => write!(f, "subc rejected ModuleHello (unparseable error body)"),
3243            },
3244            Self::UnexpectedFrame { ty } => {
3245                write!(f, "subc sent unexpected frame in place of HelloAck: {ty:?}")
3246            }
3247        }
3248    }
3249}
3250
3251impl std::error::Error for SubcError {}