Skip to main content

aft/
subc.rs

1//! subc daemon attach — transport edge.
2//!
3//! When AFT is launched as `aft --subc <connection-file>`, it does NOT run the
4//! standalone NDJSON-over-stdin loop. Instead it connects to a running subc
5//! daemon over loopback TCP, authenticates with the pre-envelope HMAC handshake
6//! (`subc-transport`), then speaks the subc frame protocol (`subc-protocol`):
7//! ModuleHello → HelloAck (register as a tool provider), then a channel-0
8//! control loop (Ping/Pong, RouteBind) plus route-channel tool calls.
9//!
10//! Concurrency: subc routes tool calls through the executor. The tokio
11//! edge never dispatches against `AppContext` inline; per-actor executor lanes
12//! own the reader/mutator epoch, while a writer task serializes outbound frames.
13
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::fmt;
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use serde::Deserialize;
22use serde_json::{json, Value};
23
24use crate::config::Config;
25use crate::config_resolve::ConfigTier;
26use crate::context::{App, AppContext, ProgressSender};
27use crate::executor::{Executor, Lane};
28use crate::log_ctx;
29use crate::path_identity::ProjectRootId;
30use crate::protocol::{ProgressKind, PushFrame, RawRequest, Response};
31use crate::runtime_drain;
32
33use subc_protocol::manifest::{
34    Bindings, Concurrency, ExecutionMode, IdentityBinding, IdentityScope, ModuleManifest,
35    ProviderRole, StorageBinding, StorageKind, StorageScope, Tool, TrustTier,
36};
37use subc_protocol::session::{ModuleControlRequest, ModuleControlResponse};
38use subc_protocol::{
39    ErrorBody, Flags, Frame, FrameType, ModuleHelloBody, Priority, PROTOCOL_VERSION,
40};
41use subc_transport::{authenticate_client, connection_file, read_frame, write_frame};
42use tokio::io::{AsyncRead, AsyncWrite};
43use tokio::net::TcpStream;
44use tokio::sync::{mpsc, oneshot, Notify};
45use tokio::task::JoinHandle;
46
47/// Handshake budget. subc binds-before-spawn, so a reachable daemon authenticates
48/// well within this; an unreachable/socket-stale daemon fails loud rather than
49/// silently downgrading to standalone (the --subc contract).
50const AUTH_DEADLINE: Duration = Duration::from_secs(5);
51
52/// Correlation id for the initial ModuleHello (channel 0).
53const HELLO_CORR: u64 = 1;
54
55/// Per-session in-memory replay cap for must-deliver Push frames. This covers
56/// detach/re-attach while AFT stays alive; cross-restart replay is phased later.
57const PUSH_BUFFER_MAX_PER_KEY: usize = 256;
58
59/// Bounded guard for control-frame sends. If the daemon stops reading and the
60/// writer queue stays full, tear the subc edge down instead of stalling the
61/// route loop indefinitely.
62const CONTROL_SEND_TIMEOUT: Duration = Duration::from_millis(250);
63
64/// Small bounded memory of completed task ids used to suppress stale lossy
65/// long-running reminders that arrive after their reliable completion event.
66const COMPLETED_TASK_SUPPRESSION_MAX: usize = 4096;
67
68type RouteChannel = u32;
69type PushEnvelope = (ProjectRootId, PushFrame);
70type RetryBuffer = HashMap<RouteChannel, VecDeque<(ReplayKey, PushFrame)>>;
71
72#[derive(Clone)]
73struct PushSenders {
74    lossy_tx: mpsc::Sender<PushEnvelope>,
75    reliable_tx: mpsc::UnboundedSender<PushEnvelope>,
76}
77
78#[derive(Debug)]
79struct RootMeta {
80    maintenance_pending: bool,
81    last_touched: Instant,
82    diagnostics_on_edit: bool,
83}
84
85#[derive(Debug)]
86struct PendingBind {
87    bind_root_id: ProjectRootId,
88    inserted_new_actor: bool,
89    cancelled: bool,
90}
91
92struct RouteBindCompletion {
93    route_channel: u16,
94    identity: RouteIdentity,
95    bind_root_id: ProjectRootId,
96    inserted_new_actor: bool,
97    configure_response: Response,
98    drain_response: Option<Response>,
99    diagnostics_on_edit: bool,
100    ver: u8,
101    corr: u64,
102    flags: Flags,
103}
104
105#[derive(Debug, Clone)]
106struct RouteIdentity {
107    root: ProjectRootId,
108    project_root: PathBuf,
109    harness: String,
110    session: String,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Hash)]
114struct ReplayKey {
115    root: ProjectRootId,
116    harness: String,
117    session: String,
118}
119
120impl ReplayKey {
121    fn from_identity(identity: &RouteIdentity) -> Self {
122        Self {
123            root: identity.root.clone(),
124            harness: identity.harness.clone(),
125            session: identity.session.clone(),
126        }
127    }
128}
129
130#[derive(Debug, Default)]
131struct CompletedTaskIds {
132    order: VecDeque<String>,
133    set: HashSet<String>,
134}
135
136impl CompletedTaskIds {
137    fn remember(&mut self, task_id: &str) {
138        if self.set.contains(task_id) {
139            return;
140        }
141        if self.order.len() >= COMPLETED_TASK_SUPPRESSION_MAX {
142            if let Some(evicted) = self.order.pop_front() {
143                self.set.remove(&evicted);
144            }
145        }
146        let task_id = task_id.to_string();
147        self.order.push_back(task_id.clone());
148        self.set.insert(task_id);
149    }
150
151    fn contains(&self, task_id: &str) -> bool {
152        self.set.contains(task_id)
153    }
154}
155
156impl RootMeta {
157    fn new(now: Instant) -> Self {
158        Self {
159            maintenance_pending: false,
160            last_touched: now,
161            diagnostics_on_edit: false,
162        }
163    }
164
165    fn touch(&mut self) {
166        self.last_touched = Instant::now();
167    }
168}
169
170fn route_key(channel: u16) -> RouteChannel {
171    RouteChannel::from(channel)
172}
173
174fn remove_root_channel(
175    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
176    root: &ProjectRootId,
177    channel: RouteChannel,
178) {
179    let remove_root = if let Some(channels) = root_channels.get_mut(root) {
180        channels.remove(&channel);
181        channels.is_empty()
182    } else {
183        false
184    };
185    if remove_root {
186        root_channels.remove(root);
187    }
188}
189
190fn remove_route_channel(
191    routes: &mut HashMap<RouteChannel, RouteIdentity>,
192    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
193    channel: RouteChannel,
194) -> Option<RouteIdentity> {
195    let removed = routes.remove(&channel);
196    if let Some(identity) = &removed {
197        remove_root_channel(root_channels, &identity.root, channel);
198    }
199    removed
200}
201
202fn insert_route_channel(
203    routes: &mut HashMap<RouteChannel, RouteIdentity>,
204    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
205    channel: RouteChannel,
206    identity: RouteIdentity,
207) {
208    if let Some(previous) = routes.insert(channel, identity.clone()) {
209        remove_root_channel(root_channels, &previous.root, channel);
210    }
211    root_channels
212        .entry(identity.root.clone())
213        .or_default()
214        .insert(channel);
215}
216
217fn remember_session_identity(
218    session_identity: &mut HashMap<(ProjectRootId, String), String>,
219    identity: &RouteIdentity,
220) {
221    // Retained after route Goodbye so reliable session-scoped frames emitted while
222    // the session is detached can still be keyed by the full (root,harness,session)
223    // replay triple. The idle-TTL actor reaper is responsible for pruning stale
224    // identities/buffers once an actor is evicted.
225    session_identity.insert(
226        (identity.root.clone(), identity.session.clone()),
227        identity.harness.clone(),
228    );
229}
230
231fn replay_key_for_session(
232    session_identity: &HashMap<(ProjectRootId, String), String>,
233    root: &ProjectRootId,
234    session: &str,
235) -> Option<ReplayKey> {
236    let harness = session_identity.get(&(root.clone(), session.to_string()))?;
237    Some(ReplayKey {
238        root: root.clone(),
239        harness: harness.clone(),
240        session: session.to_string(),
241    })
242}
243
244fn frame_session(frame: &PushFrame) -> Option<&str> {
245    match frame {
246        PushFrame::BashCompleted(completed) => Some(completed.session_id.as_str()),
247        PushFrame::BashLongRunning(long_running) => Some(long_running.session_id.as_str()),
248        PushFrame::BashPatternMatch(pattern_match) => Some(pattern_match.session_id.as_str()),
249        PushFrame::ConfigureWarnings(warnings) => warnings.session_id.as_deref(),
250        PushFrame::StatusChanged(status) => status.session_id.as_deref(),
251        PushFrame::Progress(_) => None,
252    }
253}
254
255fn frame_is_reliable(frame: &PushFrame) -> bool {
256    matches!(
257        frame,
258        PushFrame::BashCompleted(_)
259            | PushFrame::BashPatternMatch(_)
260            | PushFrame::ConfigureWarnings(_)
261    )
262}
263
264fn completed_task_id(frame: &PushFrame) -> Option<&str> {
265    match frame {
266        PushFrame::BashCompleted(completed) => Some(completed.task_id.as_str()),
267        _ => None,
268    }
269}
270
271fn long_running_task_id(frame: &PushFrame) -> Option<&str> {
272    match frame {
273        PushFrame::BashLongRunning(long_running) => Some(long_running.task_id.as_str()),
274        _ => None,
275    }
276}
277
278fn should_drop_lossy_push(completed_tasks: &CompletedTaskIds, frame: &PushFrame) -> bool {
279    long_running_task_id(frame).is_some_and(|task_id| completed_tasks.contains(task_id))
280}
281
282fn progress_sender_for_root(push_senders: PushSenders, root_id: ProjectRootId) -> ProgressSender {
283    Arc::new(Box::new(move |frame: PushFrame| {
284        // Emitters can run on executor workers, maintenance jobs, watcher drains,
285        // semantic refresh workers, or bg-bash watchdog threads. Never block any
286        // of them on subc routing/backpressure: reliable frames take an
287        // unbounded non-blocking lane; lossy frames stay bounded and coalesced.
288        if frame_is_reliable(&frame) {
289            let _ = push_senders.reliable_tx.send((root_id.clone(), frame));
290        } else {
291            let _ = push_senders.lossy_tx.try_send((root_id.clone(), frame));
292        }
293    }))
294}
295
296#[derive(Debug, Clone, PartialEq, Eq, Hash)]
297enum LossyProgressKind {
298    Stdout,
299    Stderr,
300}
301
302impl From<&ProgressKind> for LossyProgressKind {
303    fn from(kind: &ProgressKind) -> Self {
304        match kind {
305            ProgressKind::Stdout => Self::Stdout,
306            ProgressKind::Stderr => Self::Stderr,
307        }
308    }
309}
310
311#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312enum LossyPushKey {
313    Progress {
314        request_id: String,
315        kind: LossyProgressKind,
316    },
317    StatusChanged,
318    BashLongRunning {
319        task_id: String,
320    },
321}
322
323fn lossy_push_key(frame: &PushFrame) -> Option<LossyPushKey> {
324    match frame {
325        PushFrame::Progress(progress) => Some(LossyPushKey::Progress {
326            request_id: progress.request_id.clone(),
327            kind: LossyProgressKind::from(&progress.kind),
328        }),
329        PushFrame::StatusChanged(_) => Some(LossyPushKey::StatusChanged),
330        PushFrame::BashLongRunning(long_running) => Some(LossyPushKey::BashLongRunning {
331            task_id: long_running.task_id.clone(),
332        }),
333        PushFrame::BashCompleted(_)
334        | PushFrame::BashPatternMatch(_)
335        | PushFrame::ConfigureWarnings(_) => None,
336    }
337}
338
339fn coalesce_push_batch(batch: Vec<(ProjectRootId, PushFrame)>) -> Vec<(ProjectRootId, PushFrame)> {
340    let mut slots: Vec<Option<(ProjectRootId, PushFrame)>> = Vec::with_capacity(batch.len());
341    let mut latest_lossy: HashMap<(ProjectRootId, LossyPushKey), usize> = HashMap::new();
342
343    for (root, frame) in batch {
344        if let Some(lossy_key) = lossy_push_key(&frame) {
345            let map_key = (root.clone(), lossy_key);
346            if let Some(previous_index) = latest_lossy.insert(map_key, slots.len()) {
347                slots[previous_index] = None;
348            }
349        }
350        slots.push(Some((root, frame)));
351    }
352
353    slots.into_iter().flatten().collect()
354}
355
356#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
357struct FanOutResult {
358    /// Channels matching the frame's project/session scope. Reliable Push frames
359    /// that match a channel but hit writer backpressure are held in retry_buffer
360    /// instead of being mistaken for detach replay.
361    matched_channels: usize,
362    /// Frames accepted by the writer queue immediately. Lossy frames that are not
363    /// accepted are dropped; reliable frames are retried on transient backpressure.
364    sent_frames: usize,
365}
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368enum PushSendOutcome {
369    Sent,
370    Backpressure,
371    PermanentFailure,
372}
373
374fn try_send_push_body(
375    writer_tx: &mpsc::Sender<Frame>,
376    channel: RouteChannel,
377    body: &[u8],
378) -> PushSendOutcome {
379    let Ok(route_channel) = u16::try_from(channel) else {
380        log::warn!("subc attach: invalid route channel {channel} for Push fan-out");
381        return PushSendOutcome::PermanentFailure;
382    };
383    let push_frame = match Frame::build_with_version(
384        PROTOCOL_VERSION,
385        FrameType::Push,
386        control_flags(),
387        route_channel,
388        0,
389        body.to_vec(),
390    ) {
391        Ok(frame) => frame,
392        Err(error) => {
393            log::warn!("subc attach: failed to build Push frame: {error}");
394            return PushSendOutcome::PermanentFailure;
395        }
396    };
397    match writer_tx.try_send(push_frame) {
398        Ok(()) => PushSendOutcome::Sent,
399        Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
400        Err(mpsc::error::TrySendError::Closed(_)) => {
401            log::warn!("subc attach: writer closed while sending Push frame");
402            PushSendOutcome::PermanentFailure
403        }
404    }
405}
406
407fn try_send_push_frame(
408    writer_tx: &mpsc::Sender<Frame>,
409    channel: RouteChannel,
410    frame: &PushFrame,
411) -> PushSendOutcome {
412    let body = match serde_json::to_vec(frame) {
413        Ok(body) => body,
414        Err(error) => {
415            log::warn!("subc attach: failed to serialize PushFrame: {error}");
416            return PushSendOutcome::PermanentFailure;
417        }
418    };
419    try_send_push_body(writer_tx, channel, &body)
420}
421
422fn bounded_push_back<T>(queue: &mut VecDeque<T>, item: T) {
423    if queue.len() >= PUSH_BUFFER_MAX_PER_KEY {
424        queue.pop_front();
425    }
426    queue.push_back(item);
427}
428
429fn buffer_push_frame(
430    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
431    key: ReplayKey,
432    frame: PushFrame,
433) {
434    bounded_push_back(push_buffer.entry(key).or_default(), frame);
435}
436
437fn buffer_retry_frame(
438    retry_buffer: &mut RetryBuffer,
439    channel: RouteChannel,
440    key: ReplayKey,
441    frame: PushFrame,
442) {
443    bounded_push_back(retry_buffer.entry(channel).or_default(), (key, frame));
444}
445
446fn migrate_retry_buffer_to_push_buffer(
447    retry_buffer: &mut RetryBuffer,
448    channel: RouteChannel,
449    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
450) -> usize {
451    let Some(frames) = retry_buffer.remove(&channel) else {
452        return 0;
453    };
454    let migrated = frames.len();
455    for (key, frame) in frames {
456        buffer_push_frame(push_buffer, key, frame);
457    }
458    migrated
459}
460
461fn replay_buffered_push_frames(
462    writer_tx: &mpsc::Sender<Frame>,
463    channel: RouteChannel,
464    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
465    key: &ReplayKey,
466) -> usize {
467    let mut sent = 0;
468    let remove_empty;
469
470    {
471        let Some(queue) = push_buffer.get_mut(key) else {
472            return 0;
473        };
474
475        while let Some(frame) = queue.pop_front() {
476            match try_send_push_frame(writer_tx, channel, &frame) {
477                PushSendOutcome::Sent => sent += 1,
478                PushSendOutcome::Backpressure => {
479                    queue.push_front(frame);
480                    break;
481                }
482                PushSendOutcome::PermanentFailure => {
483                    log::warn!(
484                        "subc attach: dropping buffered reliable Push for root {} harness {} session {} after permanent send failure",
485                        key.root.as_path().display(),
486                        key.harness,
487                        key.session
488                    );
489                }
490            }
491        }
492
493        remove_empty = queue.is_empty();
494    }
495
496    if remove_empty {
497        push_buffer.remove(key);
498    }
499
500    sent
501}
502
503fn drain_retry_buffer_for_channel(
504    writer_tx: &mpsc::Sender<Frame>,
505    channel: RouteChannel,
506    retry_buffer: &mut RetryBuffer,
507) -> usize {
508    let mut sent = 0;
509    let remove_empty;
510
511    {
512        let Some(queue) = retry_buffer.get_mut(&channel) else {
513            return 0;
514        };
515
516        while let Some((key, frame)) = queue.pop_front() {
517            match try_send_push_frame(writer_tx, channel, &frame) {
518                PushSendOutcome::Sent => sent += 1,
519                PushSendOutcome::Backpressure => {
520                    queue.push_front((key, frame));
521                    break;
522                }
523                PushSendOutcome::PermanentFailure => {
524                    log::warn!(
525                        "subc attach: dropping retry-buffered reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
526                        key.root.as_path().display(),
527                        key.harness,
528                        key.session
529                    );
530                }
531            }
532        }
533
534        remove_empty = queue.is_empty();
535    }
536
537    if remove_empty {
538        retry_buffer.remove(&channel);
539    }
540
541    sent
542}
543
544fn drain_retry_buffers_for_bound_routes(
545    writer_tx: &mpsc::Sender<Frame>,
546    routes: &HashMap<RouteChannel, RouteIdentity>,
547    retry_buffer: &mut RetryBuffer,
548) -> usize {
549    let channels: Vec<RouteChannel> = routes.keys().copied().collect();
550    channels
551        .into_iter()
552        .map(|channel| drain_retry_buffer_for_channel(writer_tx, channel, retry_buffer))
553        .sum()
554}
555
556fn matching_route_channels(
557    routes: &HashMap<RouteChannel, RouteIdentity>,
558    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
559    root: &ProjectRootId,
560    frame: &PushFrame,
561) -> Vec<RouteChannel> {
562    let Some(channels) = root_channels.get(root) else {
563        return Vec::new();
564    };
565
566    let session = frame_session(frame);
567    channels
568        .iter()
569        .copied()
570        .filter(|channel| match session {
571            Some(session) => routes
572                .get(channel)
573                .is_some_and(|identity| identity.session == session),
574            None => true,
575        })
576        .collect()
577}
578
579fn buffer_detached_reliable_push_frame(
580    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
581    session_identity: &HashMap<(ProjectRootId, String), String>,
582    root: &ProjectRootId,
583    frame: &PushFrame,
584) {
585    let Some(session) = frame_session(frame) else {
586        log::warn!(
587            "subc attach: dropping reliable project-scoped Push for root {} because no route is bound",
588            root.as_path().display()
589        );
590        return;
591    };
592
593    if let Some(key) = replay_key_for_session(session_identity, root, session) {
594        buffer_push_frame(push_buffer, key, frame.clone());
595    } else {
596        log::warn!(
597            "subc attach: dropping reliable Push for root {} session {} because no retained harness identity is known",
598            root.as_path().display(),
599            session
600        );
601    }
602}
603
604fn fan_out_lossy_push_frame(
605    writer_tx: &mpsc::Sender<Frame>,
606    routes: &HashMap<RouteChannel, RouteIdentity>,
607    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
608    root: &ProjectRootId,
609    frame: &PushFrame,
610) -> FanOutResult {
611    let matching_channels = matching_route_channels(routes, root_channels, root, frame);
612    let matched_channels = matching_channels.len();
613    if matched_channels == 0 {
614        return FanOutResult::default();
615    }
616
617    let body = match serde_json::to_vec(frame) {
618        Ok(body) => body,
619        Err(error) => {
620            log::warn!("subc attach: failed to serialize PushFrame for fan-out: {error}");
621            return FanOutResult {
622                matched_channels,
623                sent_frames: 0,
624            };
625        }
626    };
627
628    let sent_frames = matching_channels
629        .into_iter()
630        .filter(|&channel| {
631            matches!(
632                try_send_push_body(writer_tx, channel, &body),
633                PushSendOutcome::Sent
634            )
635        })
636        .count();
637
638    FanOutResult {
639        matched_channels,
640        sent_frames,
641    }
642}
643
644fn fan_out_reliable_push_frame(
645    writer_tx: &mpsc::Sender<Frame>,
646    routes: &HashMap<RouteChannel, RouteIdentity>,
647    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
648    session_identity: &HashMap<(ProjectRootId, String), String>,
649    retry_buffer: &mut RetryBuffer,
650    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
651    root: &ProjectRootId,
652    frame: &PushFrame,
653) -> FanOutResult {
654    let matching_channels = matching_route_channels(routes, root_channels, root, frame);
655    let matched_channels = matching_channels.len();
656    if matched_channels == 0 {
657        buffer_detached_reliable_push_frame(push_buffer, session_identity, root, frame);
658        return FanOutResult::default();
659    }
660
661    let mut sent_frames = 0;
662    for channel in matching_channels {
663        let Some(identity) = routes.get(&channel) else {
664            log::warn!(
665                "subc attach: dropping reliable Push for stale route channel {channel} with no route identity"
666            );
667            continue;
668        };
669        let key = ReplayKey::from_identity(identity);
670
671        if retry_buffer
672            .get(&channel)
673            .is_some_and(|queue| !queue.is_empty())
674        {
675            buffer_retry_frame(retry_buffer, channel, key, frame.clone());
676            continue;
677        }
678
679        match try_send_push_frame(writer_tx, channel, frame) {
680            PushSendOutcome::Sent => sent_frames += 1,
681            PushSendOutcome::Backpressure => {
682                buffer_retry_frame(retry_buffer, channel, key, frame.clone());
683            }
684            PushSendOutcome::PermanentFailure => {
685                log::warn!(
686                    "subc attach: dropping reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
687                    key.root.as_path().display(),
688                    key.harness,
689                    key.session
690                );
691            }
692        }
693    }
694
695    FanOutResult {
696        matched_channels,
697        sent_frames,
698    }
699}
700
701fn process_reliable_push_frame(
702    writer_tx: &mpsc::Sender<Frame>,
703    routes: &HashMap<RouteChannel, RouteIdentity>,
704    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
705    session_identity: &HashMap<(ProjectRootId, String), String>,
706    retry_buffer: &mut RetryBuffer,
707    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
708    completed_tasks: &mut CompletedTaskIds,
709    root: ProjectRootId,
710    frame: PushFrame,
711) {
712    if let Some(task_id) = completed_task_id(&frame) {
713        completed_tasks.remember(task_id);
714    }
715    let _ = fan_out_reliable_push_frame(
716        writer_tx,
717        routes,
718        root_channels,
719        session_identity,
720        retry_buffer,
721        push_buffer,
722        &root,
723        &frame,
724    );
725}
726
727fn process_lossy_push_frame(
728    writer_tx: &mpsc::Sender<Frame>,
729    routes: &HashMap<RouteChannel, RouteIdentity>,
730    root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
731    completed_tasks: &CompletedTaskIds,
732    root: ProjectRootId,
733    frame: PushFrame,
734) {
735    if should_drop_lossy_push(completed_tasks, &frame) {
736        if let Some(task_id) = long_running_task_id(&frame) {
737            log::debug!(
738                "subc attach: dropping stale BashLongRunning Push for completed task {task_id}"
739            );
740        }
741        return;
742    }
743
744    let _ = fan_out_lossy_push_frame(writer_tx, routes, root_channels, &root, &frame);
745}
746
747/// Sync command dispatch, passed in from `main` (the binary owns the command
748/// table). Invoked only inside executor jobs in subc mode.
749pub type DispatchFn = fn(RawRequest, &AppContext) -> Response;
750
751/// Entry point for `aft --subc <connection-file>`. Synchronous on the outside;
752/// owns an isolated current-thread tokio runtime for the async transport.
753/// Returns `Err` (fail-loud) on any connect/auth/protocol failure — we never
754/// fall back to the standalone loop, to avoid split-brain index state.
755pub fn run_subc_mode(
756    connection_file_path: &Path,
757    ctx: Arc<AppContext>,
758    executor: Arc<Executor>,
759    dispatch: DispatchFn,
760    user_config_path: Option<PathBuf>,
761) -> Result<(), SubcError> {
762    // Production NEVER allows non-manifest tool names on route channels: AFT
763    // fails closed and does not trust subc to enforce the manifest. The
764    // test-only harness sets this through `run_subc_mode_for_test`.
765    run_subc_mode_inner(
766        connection_file_path,
767        ctx,
768        executor,
769        dispatch,
770        user_config_path,
771        false,
772    )
773}
774
775fn run_subc_mode_inner(
776    connection_file_path: &Path,
777    ctx: Arc<AppContext>,
778    executor: Arc<Executor>,
779    dispatch: DispatchFn,
780    user_config_path: Option<PathBuf>,
781    allow_native_passthrough: bool,
782) -> Result<(), SubcError> {
783    let runtime = tokio::runtime::Builder::new_current_thread()
784        .enable_all()
785        .build()
786        .map_err(SubcError::Runtime)?;
787
788    let executor_for_loop = Arc::clone(&executor);
789    let loop_result = runtime.block_on(async move {
790        let shared_app = ctx.app();
791        drop(ctx);
792        let stream = connect_and_authenticate(connection_file_path).await?;
793        log::info!(
794            "subc attach: authenticated to daemon via {}",
795            connection_file_path.display()
796        );
797        let (read_half, write_half) = tokio::io::split(stream);
798        run_module_loop(
799            read_half,
800            write_half,
801            shared_app,
802            executor_for_loop,
803            dispatch,
804            user_config_path,
805            allow_native_passthrough,
806        )
807        .await
808    });
809
810    for actor_ctx in executor.actor_contexts() {
811        actor_ctx.lsp().shutdown_all();
812        actor_ctx.bash_background().detach();
813    }
814
815    loop_result
816}
817
818/// Test-only entry that enables the non-manifest native-command passthrough on
819/// route channels. Integration tests drive synthetic native commands (`glob`,
820/// `callers`, `subc_test_echo_session`, …) through the executor to exercise
821/// mechanics; production callers use [`run_subc_mode`], which fails closed.
822#[doc(hidden)]
823pub fn run_subc_mode_for_test(
824    connection_file_path: &Path,
825    ctx: Arc<AppContext>,
826    executor: Arc<Executor>,
827    dispatch: DispatchFn,
828    user_config_path: Option<PathBuf>,
829) -> Result<(), SubcError> {
830    run_subc_mode_inner(
831        connection_file_path,
832        ctx,
833        executor,
834        dispatch,
835        user_config_path,
836        true,
837    )
838}
839
840/// Read the connection file → resolve the first endpoint → TCP connect → HMAC
841/// handshake. Mirrors the reference `fake-aft-stub::connect_to_subc`.
842async fn connect_and_authenticate(connection_file_path: &Path) -> Result<TcpStream, SubcError> {
843    let conn = connection_file::read(connection_file_path).map_err(|source| {
844        SubcError::ConnectionFile {
845            path: connection_file_path.to_path_buf(),
846            source,
847        }
848    })?;
849
850    let endpoint = conn
851        .endpoints
852        .first()
853        .ok_or_else(|| SubcError::NoEndpoint {
854            path: connection_file_path.to_path_buf(),
855        })?;
856    let endpoint_label = format!("{}:{}", endpoint.host, endpoint.port);
857    let ip = endpoint
858        .host
859        .parse::<IpAddr>()
860        .map_err(|_| SubcError::InvalidEndpoint {
861            path: connection_file_path.to_path_buf(),
862            endpoint: endpoint_label.clone(),
863        })?;
864    let addr = SocketAddr::new(ip, endpoint.port);
865
866    let mut stream = TcpStream::connect(addr)
867        .await
868        .map_err(|source| SubcError::Connect {
869            endpoint: endpoint_label.clone(),
870            source,
871        })?;
872
873    authenticate_client(&mut stream, &conn, AUTH_DEADLINE)
874        .await
875        .map_err(|source| SubcError::Auth {
876            endpoint: endpoint_label,
877            source,
878        })?;
879
880    Ok(stream)
881}
882
883/// ModuleHello → HelloAck → control/route loop. Runs until the daemon closes
884/// the connection (EOF), sends channel-0 Goodbye, or a fatal mutating executor
885/// response requests whole-connection teardown.
886async fn run_module_loop<R, W>(
887    mut read: R,
888    mut write: W,
889    shared_app: Arc<App>,
890    executor: Arc<Executor>,
891    dispatch: DispatchFn,
892    user_config_path: Option<PathBuf>,
893    allow_native_passthrough: bool,
894) -> Result<(), SubcError>
895where
896    R: AsyncRead + Unpin + Send + 'static,
897    W: AsyncWrite + Unpin + Send + 'static,
898{
899    // ModuleHello: register as a tool provider. control_ops:None = full baseline.
900    // Echo the one-time launch nonce the daemon injected via SUBC_LAUNCH_NONCE so a
901    // reserved module_id's HELLO is accepted; absent for non-reserved/self-connect.
902    let hello = ModuleHelloBody {
903        manifest: build_manifest(),
904        protocol_ver: PROTOCOL_VERSION,
905        control_ops: None,
906        launch_nonce: std::env::var("SUBC_LAUNCH_NONCE").ok(),
907    };
908    let hello_frame = Frame::build(
909        FrameType::Hello,
910        control_flags(),
911        0,
912        HELLO_CORR,
913        serde_json::to_vec(&hello).map_err(SubcError::Json)?,
914    )
915    .map_err(SubcError::FrameBuild)?;
916    write_frame(&mut write, &hello_frame)
917        .await
918        .map_err(SubcError::FrameIo)?;
919
920    // Expect HelloAck (registered) or a channel-0 Error (manifest/version reject).
921    match read_frame(&mut read).await.map_err(SubcError::FrameIo)? {
922        None => return Err(SubcError::ClosedBeforeHelloAck),
923        Some(frame) => match frame.header.ty {
924            FrameType::HelloAck => {
925                log::info!("subc attach: registered (HelloAck received)");
926            }
927            FrameType::Error => {
928                let body = serde_json::from_slice::<ErrorBody>(&frame.body).ok();
929                return Err(SubcError::HelloRejected { body });
930            }
931            other => return Err(SubcError::UnexpectedFrame { ty: other }),
932        },
933    }
934
935    let (writer_tx, writer_rx) = mpsc::channel::<Frame>(256);
936    let writer_task = spawn_writer_task(write, writer_rx);
937    // `read_frame` is NOT cancellation-safe, so it must never sit directly inside
938    // the `select!` below: a drain-interval tick (or shutdown) firing while a
939    // frame is mid-transit would drop the partially-consumed bytes and desync the
940    // stream (the next read would parse a body byte as a frame header). A
941    // dedicated reader task owns the socket, reads whole frames sequentially, and
942    // forwards them over a channel; the loop selects on the cancel-safe `recv()`.
943    let (reader_tx, mut reader_rx) = mpsc::channel::<Result<Frame, SubcError>>(256);
944    let reader_task = spawn_reader_task(read, reader_tx);
945    let shutdown = Arc::new(Notify::new());
946    let mut drain_interval = tokio::time::interval(Duration::from_millis(250));
947    let (maintenance_tx, mut maintenance_rx) = mpsc::channel::<(ProjectRootId, Response)>(256);
948    let (control_completion_tx, mut control_completion_rx) =
949        mpsc::channel::<RouteBindCompletion>(256);
950    let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1024);
951    let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
952    let push_senders = PushSenders {
953        lossy_tx,
954        reliable_tx,
955    };
956    let mut routes: HashMap<RouteChannel, RouteIdentity> = HashMap::new();
957    let mut root_channels: HashMap<ProjectRootId, HashSet<RouteChannel>> = HashMap::new();
958    let mut session_identity: HashMap<(ProjectRootId, String), String> = HashMap::new();
959    let mut push_buffer: HashMap<ReplayKey, VecDeque<PushFrame>> = HashMap::new();
960    let mut retry_buffer: RetryBuffer = HashMap::new();
961    let mut completed_tasks = CompletedTaskIds::default();
962    let mut live_roots: HashMap<ProjectRootId, RootMeta> = HashMap::new();
963    let mut pending_binds: HashMap<RouteChannel, PendingBind> = HashMap::new();
964
965    let loop_result: Result<(), SubcError> = loop {
966        tokio::select! {
967            _ = shutdown.notified() => {
968                log::warn!("subc attach: fatal executor response requested teardown");
969                break Ok(());
970            }
971            maybe_frame = reader_rx.recv() => {
972                let frame = match maybe_frame {
973                    None => {
974                        log::info!("subc attach: daemon closed connection");
975                        break Ok(());
976                    }
977                    Some(Err(error)) => break Err(error),
978                    Some(Ok(frame)) => frame,
979                };
980
981                match frame.header.ty {
982                    FrameType::Ping if frame.header.channel == 0 => {
983                        let pong = match Frame::build_with_version(
984                            frame.header.ver,
985                            FrameType::Pong,
986                            frame.header.flags,
987                            0,
988                            frame.header.corr,
989                            Vec::new(),
990                        ) {
991                            Ok(pong) => pong,
992                            Err(error) => break Err(SubcError::FrameBuild(error)),
993                        };
994                        if let Err(error) = send_frame(&writer_tx, pong).await {
995                            break Err(error);
996                        }
997                    }
998                    FrameType::Goodbye if frame.header.channel == 0 => {
999                        log::info!("subc attach: received channel-0 Goodbye");
1000                        break Ok(());
1001                    }
1002                    FrameType::Goodbye => {
1003                        let channel = route_key(frame.header.channel);
1004                        if let Some(pending) = pending_binds.get_mut(&channel) {
1005                            pending.cancelled = true;
1006                            log::debug!(
1007                                "subc attach: cancelled pending RouteBind for route {} on Goodbye",
1008                                frame.header.channel
1009                            );
1010                        }
1011                        let migrated = migrate_retry_buffer_to_push_buffer(
1012                            &mut retry_buffer,
1013                            channel,
1014                            &mut push_buffer,
1015                        );
1016                        if let Some(identity) = remove_route_channel(&mut routes, &mut root_channels, channel) {
1017                            if migrated > 0 {
1018                                log::debug!(
1019                                    "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from route {} into detach replay",
1020                                    frame.header.channel
1021                                );
1022                            }
1023                            if let Some(meta) = live_roots.get_mut(&identity.root) {
1024                                let idle_for = meta.last_touched.elapsed();
1025                                meta.touch();
1026                                log::debug!(
1027                                    "subc attach: route {} torn down for root {} harness {} session {} (last touched {:?} ago)",
1028                                    frame.header.channel,
1029                                    identity.root.as_path().display(),
1030                                    identity.harness,
1031                                    identity.session,
1032                                    idle_for
1033                                );
1034                            } else {
1035                                log::debug!(
1036                                    "subc attach: route {} torn down for root {} harness {} session {}",
1037                                    frame.header.channel,
1038                                    identity.root.as_path().display(),
1039                                    identity.harness,
1040                                    identity.session
1041                                );
1042                            }
1043                        } else {
1044                            if migrated > 0 {
1045                                log::debug!(
1046                                    "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from unbound route {} into detach replay",
1047                                    frame.header.channel
1048                                );
1049                            }
1050                            log::debug!("subc attach: unbound route {} torn down", frame.header.channel);
1051                        }
1052                    }
1053                    FrameType::Request if frame.header.channel == 0 => {
1054                        if let Err(error) = handle_control_request(
1055                            &writer_tx,
1056                            &frame,
1057                            &shared_app,
1058                            &executor,
1059                            &mut live_roots,
1060                            &mut pending_binds,
1061                            &control_completion_tx,
1062                            &push_senders,
1063                            dispatch,
1064                            user_config_path.as_deref(),
1065                        )
1066                        .await
1067                        {
1068                            break Err(error);
1069                        }
1070                    }
1071                    FrameType::Request => {
1072                        if let Err(error) = handle_tool_call(
1073                            &writer_tx,
1074                            &frame,
1075                            &routes,
1076                            &pending_binds,
1077                            &mut live_roots,
1078                            &executor,
1079                            &shutdown,
1080                            dispatch,
1081                            allow_native_passthrough,
1082                        )
1083                        .await
1084                        {
1085                            break Err(error);
1086                        }
1087                    }
1088                    // Cancel/Push/etc. are not yet handled: in-flight tool-call
1089                    // cancellation is not implemented, so these frame types are
1090                    // ignored rather than acted on.
1091                    _ => {}
1092                }
1093            }
1094            Some((root_id, frame)) = reliable_rx.recv() => {
1095                // Drain reliable frames in FIFO order. They are intentionally not
1096                // coalesced: completion, pattern-match, and warning frames are
1097                // must-deliver events.
1098                let mut batch = vec![(root_id, frame)];
1099                while let Ok(item) = reliable_rx.try_recv() {
1100                    batch.push(item);
1101                }
1102
1103                for (root, frame) in batch {
1104                    process_reliable_push_frame(
1105                        &writer_tx,
1106                        &routes,
1107                        &root_channels,
1108                        &session_identity,
1109                        &mut retry_buffer,
1110                        &mut push_buffer,
1111                        &mut completed_tasks,
1112                        root,
1113                        frame,
1114                    );
1115                }
1116            }
1117            Some((root_id, frame)) = lossy_rx.recv() => {
1118                // If both lanes are ready, process any already-queued reliable
1119                // completions first so a following stale BashLongRunning frame can
1120                // be suppressed even if select! happened to wake on the lossy lane.
1121                while let Ok((reliable_root, reliable_frame)) = reliable_rx.try_recv() {
1122                    process_reliable_push_frame(
1123                        &writer_tx,
1124                        &routes,
1125                        &root_channels,
1126                        &session_identity,
1127                        &mut retry_buffer,
1128                        &mut push_buffer,
1129                        &mut completed_tasks,
1130                        reliable_root,
1131                        reliable_frame,
1132                    );
1133                }
1134
1135                // Drain the currently queued burst in one loop turn so lossy
1136                // status/progress classes coalesce before reaching subc's shared
1137                // egress queue.
1138                let mut batch = vec![(root_id, frame)];
1139                while let Ok(item) = lossy_rx.try_recv() {
1140                    batch.push(item);
1141                }
1142
1143                for (root, frame) in coalesce_push_batch(batch) {
1144                    process_lossy_push_frame(
1145                        &writer_tx,
1146                        &routes,
1147                        &root_channels,
1148                        &completed_tasks,
1149                        root,
1150                        frame,
1151                    );
1152                }
1153            }
1154            Some(completion) = control_completion_rx.recv() => {
1155                if let Err(error) = handle_route_bind_completion(
1156                    &writer_tx,
1157                    completion,
1158                    &mut routes,
1159                    &mut root_channels,
1160                    &mut session_identity,
1161                    &mut push_buffer,
1162                    &mut live_roots,
1163                    &mut pending_binds,
1164                    &executor,
1165                    &shutdown,
1166                )
1167                .await
1168                {
1169                    break Err(error);
1170                }
1171            }
1172            Some((root_id, response)) = maintenance_rx.recv() => {
1173                if let Some(meta) = live_roots.get_mut(&root_id) {
1174                    meta.maintenance_pending = false;
1175                }
1176                if response_is_internal_error(&response) {
1177                    signal_fatal_teardown(&writer_tx, None, PROTOCOL_VERSION, 0, &shutdown).await;
1178                }
1179            }
1180            _ = drain_interval.tick() => {
1181                let retried = drain_retry_buffers_for_bound_routes(
1182                    &writer_tx,
1183                    &routes,
1184                    &mut retry_buffer,
1185                );
1186                if retried > 0 {
1187                    log::debug!(
1188                        "subc attach: retried {retried} reliable Push frame(s) after writer backpressure"
1189                    );
1190                }
1191
1192                let due_roots: Vec<ProjectRootId> = live_roots
1193                    .iter_mut()
1194                    .filter_map(|(root_id, meta)| {
1195                        if meta.maintenance_pending {
1196                            None
1197                        } else {
1198                            meta.maintenance_pending = true;
1199                            Some(root_id.clone())
1200                        }
1201                    })
1202                    .collect();
1203                for root_id in due_roots {
1204                    submit_maintenance_drain(&executor, root_id, &maintenance_tx);
1205                }
1206            }
1207        }
1208    };
1209
1210    // The reader task may be parked on `read_frame`; abort it (we are done with
1211    // the connection) and flush the writer.
1212    reader_task.abort();
1213    drop(writer_tx);
1214    let writer_result = finish_writer_task(writer_task).await;
1215    loop_result.and(writer_result)
1216}
1217
1218fn spawn_writer_task<W>(
1219    mut write: W,
1220    mut rx: mpsc::Receiver<Frame>,
1221) -> JoinHandle<Result<(), subc_transport::FrameIoError>>
1222where
1223    W: AsyncWrite + Unpin + Send + 'static,
1224{
1225    tokio::spawn(async move {
1226        while let Some(frame) = rx.recv().await {
1227            write_frame(&mut write, &frame).await?;
1228        }
1229        Ok(())
1230    })
1231}
1232
1233/// Owns the read half and reads whole frames sequentially. `read_frame` is not
1234/// cancellation-safe, so it must run here — never inside the main loop's
1235/// `select!` — to keep the inbound stream framed. Each frame (or the terminal
1236/// error / EOF) is forwarded over `tx`; the loop consumes them via cancel-safe
1237/// `recv()`. Exits on EOF (Ok(None)), a read error, or when `tx` is dropped
1238/// (the loop ended and aborted us).
1239fn spawn_reader_task<R>(mut read: R, tx: mpsc::Sender<Result<Frame, SubcError>>) -> JoinHandle<()>
1240where
1241    R: AsyncRead + Unpin + Send + 'static,
1242{
1243    tokio::spawn(async move {
1244        loop {
1245            match read_frame(&mut read).await {
1246                Ok(Some(frame)) => {
1247                    if tx.send(Ok(frame)).await.is_err() {
1248                        return;
1249                    }
1250                }
1251                Ok(None) => {
1252                    // EOF: let the loop observe channel close as "daemon closed".
1253                    return;
1254                }
1255                Err(error) => {
1256                    let _ = tx.send(Err(SubcError::FrameIo(error))).await;
1257                    return;
1258                }
1259            }
1260        }
1261    })
1262}
1263
1264async fn finish_writer_task(
1265    mut writer_task: JoinHandle<Result<(), subc_transport::FrameIoError>>,
1266) -> Result<(), SubcError> {
1267    match tokio::time::timeout(Duration::from_millis(100), &mut writer_task).await {
1268        Ok(Ok(Ok(()))) => Ok(()),
1269        Ok(Ok(Err(error))) => Err(SubcError::FrameIo(error)),
1270        Ok(Err(error)) => Err(SubcError::WriterJoin(error)),
1271        Err(_) => {
1272            writer_task.abort();
1273            Ok(())
1274        }
1275    }
1276}
1277
1278async fn send_frame(tx: &mpsc::Sender<Frame>, frame: Frame) -> Result<(), SubcError> {
1279    match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(frame)).await {
1280        Ok(Ok(())) => Ok(()),
1281        Ok(Err(_)) => Err(SubcError::WriterClosed),
1282        Err(_) => Err(SubcError::WriterBackpressureTimeout),
1283    }
1284}
1285
1286fn rollback_pending_bind_actor(
1287    executor: &Arc<Executor>,
1288    live_roots: &HashMap<ProjectRootId, RootMeta>,
1289    root_id: &ProjectRootId,
1290    inserted_new_actor: bool,
1291) {
1292    if inserted_new_actor && !live_roots.contains_key(root_id) {
1293        executor.remove_actor(root_id);
1294    }
1295}
1296
1297async fn handle_route_bind_completion(
1298    tx: &mpsc::Sender<Frame>,
1299    completion: RouteBindCompletion,
1300    routes: &mut HashMap<RouteChannel, RouteIdentity>,
1301    root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
1302    session_identity: &mut HashMap<(ProjectRootId, String), String>,
1303    push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1304    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1305    pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1306    executor: &Arc<Executor>,
1307    shutdown: &Arc<Notify>,
1308) -> Result<(), SubcError> {
1309    let route_id = route_key(completion.route_channel);
1310    let Some(pending) = pending_binds.remove(&route_id) else {
1311        log::warn!(
1312            "subc attach: dropping RouteBind completion for non-pending route {}",
1313            completion.route_channel
1314        );
1315        rollback_pending_bind_actor(
1316            executor,
1317            live_roots,
1318            &completion.bind_root_id,
1319            completion.inserted_new_actor,
1320        );
1321        return Ok(());
1322    };
1323
1324    if pending.bind_root_id != completion.bind_root_id {
1325        log::warn!(
1326            "subc attach: pending RouteBind root mismatch for route {} (pending {} completion {})",
1327            completion.route_channel,
1328            pending.bind_root_id.as_path().display(),
1329            completion.bind_root_id.as_path().display()
1330        );
1331    }
1332
1333    let inserted_new_actor = pending.inserted_new_actor || completion.inserted_new_actor;
1334    if pending.cancelled {
1335        rollback_pending_bind_actor(
1336            executor,
1337            live_roots,
1338            &completion.bind_root_id,
1339            inserted_new_actor,
1340        );
1341        log::debug!(
1342            "subc attach: discarded completed RouteBind for cancelled route {} root {}",
1343            completion.route_channel,
1344            completion.bind_root_id.as_path().display()
1345        );
1346        return Ok(());
1347    }
1348
1349    let failure = if !completion.configure_response.success {
1350        Some((
1351            &completion.configure_response,
1352            "configure failed during route bind",
1353        ))
1354    } else if let Some(drain_response) = completion.drain_response.as_ref() {
1355        if drain_response.success {
1356            None
1357        } else {
1358            Some((
1359                drain_response,
1360                "build-completion drain failed during route bind",
1361            ))
1362        }
1363    } else {
1364        None
1365    };
1366
1367    if let Some((response, fallback)) = failure {
1368        rollback_pending_bind_actor(
1369            executor,
1370            live_roots,
1371            &completion.bind_root_id,
1372            inserted_new_actor,
1373        );
1374        let message = response_message(response, fallback);
1375        let fatal = response_is_internal_error(response);
1376        send_route_bind_error_parts(
1377            tx,
1378            completion.ver,
1379            completion.corr,
1380            completion.flags,
1381            "config_divergence",
1382            &message,
1383        )
1384        .await?;
1385        if fatal {
1386            signal_fatal_teardown(
1387                tx,
1388                Some(completion.route_channel),
1389                completion.ver,
1390                completion.corr,
1391                shutdown,
1392            )
1393            .await;
1394        }
1395        return Ok(());
1396    }
1397
1398    remember_session_identity(session_identity, &completion.identity);
1399    let replay_key = ReplayKey::from_identity(&completion.identity);
1400    insert_route_channel(routes, root_channels, route_id, completion.identity);
1401    live_roots
1402        .entry(completion.bind_root_id.clone())
1403        .and_modify(|meta| {
1404            meta.touch();
1405            meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1406        })
1407        .or_insert_with(|| RootMeta::new(Instant::now()));
1408    if let Some(meta) = live_roots.get_mut(&completion.bind_root_id) {
1409        meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1410    }
1411
1412    let ack =
1413        serde_json::to_vec(&ModuleControlResponse::RouteBindAck {}).map_err(SubcError::Json)?;
1414    let response = Frame::build_with_version(
1415        completion.ver,
1416        FrameType::Response,
1417        control_flags(),
1418        0,
1419        completion.corr,
1420        ack,
1421    )
1422    .map_err(SubcError::FrameBuild)?;
1423    send_frame(tx, response).await?;
1424    let replayed = replay_buffered_push_frames(tx, route_id, push_buffer, &replay_key);
1425    if replayed > 0 {
1426        log::debug!(
1427            "subc attach: replayed {} buffered Push frame(s) to route {} root {} harness {} session {}",
1428            replayed,
1429            completion.route_channel,
1430            replay_key.root.as_path().display(),
1431            replay_key.harness,
1432            replay_key.session
1433        );
1434    }
1435    log::info!(
1436        "subc attach: route {} bound to root {}",
1437        completion.route_channel,
1438        completion.bind_root_id.as_path().display()
1439    );
1440    Ok(())
1441}
1442
1443/// channel-0 control request — currently only RouteBind. Reconciles the route's
1444/// RootConfig through the executor's Mutating lane and resolves completion on a
1445/// loop-owned control-completion channel so slow configure jobs do not block the
1446/// transport loop.
1447async fn handle_control_request(
1448    tx: &mpsc::Sender<Frame>,
1449    frame: &Frame,
1450    shared_app: &Arc<App>,
1451    executor: &Arc<Executor>,
1452    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1453    pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1454    control_completion_tx: &mpsc::Sender<RouteBindCompletion>,
1455    push_senders: &PushSenders,
1456    dispatch: DispatchFn,
1457    user_config_path: Option<&Path>,
1458) -> Result<(), SubcError> {
1459    let request =
1460        serde_json::from_slice::<ModuleControlRequest>(&frame.body).map_err(SubcError::Json)?;
1461    match request {
1462        ModuleControlRequest::RouteBind {
1463            route_channel,
1464            target: _,
1465            identity,
1466            // Any wire-relayed `config` field is ignored via `..`: AFT reads config
1467            // from CortexKit files, never the wire. `..` keeps this tolerant whether
1468            // the protocol version still carries the field or has dropped it, so a
1469            // protocol field-removal cannot break this destructure either way.
1470            ..
1471        } => {
1472            let route_id = route_key(route_channel);
1473            if pending_binds.contains_key(&route_id) {
1474                return send_route_bind_error(
1475                    tx,
1476                    frame,
1477                    "config_divergence",
1478                    "route bind is already pending for channel",
1479                )
1480                .await;
1481            }
1482
1483            let bind_root_id = match ProjectRootId::from_path(&identity.project_root) {
1484                Ok(root_id) => root_id,
1485                Err(error) => {
1486                    return send_route_bind_error(
1487                        tx,
1488                        frame,
1489                        "config_divergence",
1490                        &format!("invalid route project root: {error}"),
1491                    )
1492                    .await;
1493                }
1494            };
1495
1496            // Reconcile RootConfig: build a configure request from the bind
1497            // identity + forwarded config tiers and run it through the executor.
1498            let request_id = format!("subc-bind-{route_channel}");
1499            let bind_project_root = identity.project_root.clone();
1500            let bind_harness = identity.harness.clone();
1501            let bind_session = identity.session.clone();
1502
1503            // Config is single-per-project, read by AFT directly from the
1504            // CortexKit config files (user: ~/.config/cortexkit/aft.jsonc,
1505            // project: <root>/.cortexkit/aft.jsonc). Wire-relayed config tiers are
1506            // IGNORED entirely: a front (runner or mcp:*) cannot push config over
1507            // the wire. This is what makes config harness-INDEPENDENT — every
1508            // harness binding a project gets the identical on-disk config, so two
1509            // trust domains sharing the per-root actor can never diverge or
1510            // inherit each other's capabilities (the cross-bind escalation class).
1511            // Wire-relayed config tiers (if the protocol still carries them) are
1512            // ignored entirely; the per-tier trust boundary (user trusted, project
1513            // privileged-dropped) is applied to the FILE tiers in handle_configure.
1514            let local_tiers = crate::subc_config::read_local_cortexkit_config_tiers(
1515                user_config_path,
1516                Path::new(&bind_project_root),
1517            );
1518            let config_tiers: Vec<Value> = local_tiers
1519                .iter()
1520                .map(|t| json!({ "tier": t.tier, "source": t.source, "doc": t.doc }))
1521                .collect();
1522            let diagnostics_on_edit = diagnostics_on_edit_from_tiers(&local_tiers);
1523            let configure_json = json!({
1524                "id": request_id,
1525                "command": "configure",
1526                "project_root": bind_project_root,
1527                "harness": bind_harness,
1528                "session_id": bind_session.clone(),
1529                "config": config_tiers,
1530            });
1531            let configure_req = match serde_json::from_value::<RawRequest>(configure_json) {
1532                Ok(req) => req,
1533                Err(error) => {
1534                    return send_route_bind_error(
1535                        tx,
1536                        frame,
1537                        "config_divergence",
1538                        &format!("failed to build configure request: {error}"),
1539                    )
1540                    .await;
1541                }
1542            };
1543
1544            let route_identity = RouteIdentity {
1545                root: bind_root_id.clone(),
1546                project_root: PathBuf::from(&bind_project_root),
1547                harness: bind_harness.clone(),
1548                session: bind_session.clone(),
1549            };
1550            let configure_session = route_identity.session.clone();
1551            let root_was_live = live_roots.contains_key(&bind_root_id);
1552            let inserted_new_actor = if root_was_live {
1553                log::debug!(
1554                    "subc attach: reusing actor for route {} root {}",
1555                    route_channel,
1556                    bind_root_id.as_path().display()
1557                );
1558                false
1559            } else {
1560                let actor_ctx = Arc::new(AppContext::from_app(
1561                    Arc::clone(shared_app),
1562                    Config::default(),
1563                ));
1564                install_bash_compressor(&actor_ctx);
1565                actor_ctx.set_progress_sender(Some(progress_sender_for_root(
1566                    push_senders.clone(),
1567                    bind_root_id.clone(),
1568                )));
1569                let inserted =
1570                    executor.register_actor(bind_root_id.clone(), Arc::clone(&actor_ctx));
1571                drop(actor_ctx);
1572                // Do not insert into live_roots until configure succeeds: live_roots
1573                // drives maintenance, and a half-configured new actor must not be
1574                // maintenance-eligible before its route/session identity exists.
1575                log::debug!(
1576                    "subc attach: registered actor for route {} root {}",
1577                    route_channel,
1578                    bind_root_id.as_path().display()
1579                );
1580                inserted
1581            };
1582
1583            pending_binds.insert(
1584                route_id,
1585                PendingBind {
1586                    bind_root_id: bind_root_id.clone(),
1587                    inserted_new_actor,
1588                    cancelled: false,
1589                },
1590            );
1591
1592            let configure_request_id = configure_req.id.clone();
1593            let configure_rx = executor.submit_async(
1594                bind_root_id.clone(),
1595                Lane::Mutating,
1596                configure_request_id.clone(),
1597                Box::new(move |ctx| {
1598                    log_ctx::with_session(Some(configure_session.clone()), || {
1599                        dispatch(configure_req, ctx)
1600                    })
1601                }),
1602            );
1603
1604            let completion_tx = control_completion_tx.clone();
1605            let completion_executor = Arc::clone(executor);
1606            let completion_identity = route_identity;
1607            let completion_root = bind_root_id.clone();
1608            let completion_route_channel = route_channel;
1609            let completion_ver = frame.header.ver;
1610            let completion_corr = frame.header.corr;
1611            let completion_flags = frame.header.flags;
1612            tokio::spawn(async move {
1613                let configure_response =
1614                    await_executor_response(configure_rx, configure_request_id.clone()).await;
1615                let drain_response = if configure_response.success && !root_was_live {
1616                    let drain_request_id = format!("subc-bind-drain-{completion_route_channel}");
1617                    let drain_response_id = drain_request_id.clone();
1618                    let drain_rx = completion_executor.submit_async(
1619                        completion_root.clone(),
1620                        Lane::Mutating,
1621                        drain_request_id.clone(),
1622                        Box::new(move |ctx| {
1623                            runtime_drain::drain_build_completions(ctx);
1624                            Response::success(drain_response_id, json!({ "drained": true }))
1625                        }),
1626                    );
1627                    Some(await_executor_response(drain_rx, drain_request_id).await)
1628                } else {
1629                    None
1630                };
1631
1632                let completion = RouteBindCompletion {
1633                    route_channel: completion_route_channel,
1634                    identity: completion_identity,
1635                    bind_root_id: completion_root,
1636                    inserted_new_actor,
1637                    configure_response,
1638                    drain_response,
1639                    diagnostics_on_edit,
1640                    ver: completion_ver,
1641                    corr: completion_corr,
1642                    flags: completion_flags,
1643                };
1644                if completion_tx.send(completion).await.is_err() {
1645                    log::debug!(
1646                        "subc attach: dropped RouteBind completion for route {} after loop exit",
1647                        completion_route_channel
1648                    );
1649                }
1650            });
1651
1652            Ok(())
1653        }
1654    }
1655}
1656
1657fn install_bash_compressor(ctx: &AppContext) {
1658    // Mirrors main.rs per-actor compressor installation for subc-created actors.
1659    let filter_registry_handle = ctx.shared_filter_registry();
1660    let compress_flag = ctx.bash_compress_flag();
1661    ctx.bash_background().set_compressor_with_exit_code(
1662        move |command: &str, output: String, exit_code: Option<i32>| {
1663            if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
1664                return crate::compress::CompressionResult::new(output);
1665            }
1666            let registry_guard = match filter_registry_handle.read() {
1667                Ok(g) => g,
1668                Err(poisoned) => poisoned.into_inner(),
1669            };
1670            crate::compress::compress_with_registry_exit_code(
1671                command,
1672                &output,
1673                exit_code,
1674                &registry_guard,
1675            )
1676        },
1677    );
1678}
1679
1680fn diagnostics_on_edit_from_tiers(tiers: &[ConfigTier]) -> bool {
1681    let mut diagnostics_on_edit = false;
1682    for tier in tiers {
1683        if let Some(value) = diagnostics_on_edit_from_doc(&tier.doc) {
1684            diagnostics_on_edit = value;
1685        }
1686    }
1687    diagnostics_on_edit
1688}
1689
1690fn diagnostics_on_edit_from_doc(doc: &str) -> Option<bool> {
1691    let stripped = strip_jsonc_for_subc(doc);
1692    let value = serde_json::from_str::<Value>(&stripped).ok()?;
1693    value
1694        .get("lsp")
1695        .and_then(Value::as_object)?
1696        .get("diagnostics_on_edit")
1697        .and_then(Value::as_bool)
1698}
1699
1700fn strip_jsonc_for_subc(source: &str) -> String {
1701    strip_trailing_commas_for_subc(&strip_jsonc_comments_for_subc(source))
1702}
1703
1704fn strip_jsonc_comments_for_subc(source: &str) -> String {
1705    let mut output = String::with_capacity(source.len());
1706    let mut chars = source.chars().peekable();
1707    let mut in_string = false;
1708    let mut escaped = false;
1709
1710    while let Some(ch) = chars.next() {
1711        if in_string {
1712            output.push(ch);
1713            if escaped {
1714                escaped = false;
1715            } else if ch == '\\' {
1716                escaped = true;
1717            } else if ch == '"' {
1718                in_string = false;
1719            }
1720            continue;
1721        }
1722
1723        if ch == '"' {
1724            in_string = true;
1725            output.push(ch);
1726            continue;
1727        }
1728
1729        if ch == '/' {
1730            match chars.peek().copied() {
1731                Some('/') => {
1732                    chars.next();
1733                    for next in chars.by_ref() {
1734                        if next == '\n' {
1735                            output.push('\n');
1736                            break;
1737                        }
1738                    }
1739                }
1740                Some('*') => {
1741                    chars.next();
1742                    let mut previous = '\0';
1743                    for next in chars.by_ref() {
1744                        if next == '\n' {
1745                            output.push('\n');
1746                        }
1747                        if previous == '*' && next == '/' {
1748                            break;
1749                        }
1750                        previous = next;
1751                    }
1752                }
1753                _ => output.push(ch),
1754            }
1755            continue;
1756        }
1757
1758        output.push(ch);
1759    }
1760
1761    output
1762}
1763
1764fn strip_trailing_commas_for_subc(source: &str) -> String {
1765    let chars = source.chars().collect::<Vec<_>>();
1766    let mut output = String::with_capacity(source.len());
1767    let mut index = 0usize;
1768    let mut in_string = false;
1769    let mut escaped = false;
1770
1771    while index < chars.len() {
1772        let ch = chars[index];
1773        if in_string {
1774            output.push(ch);
1775            if escaped {
1776                escaped = false;
1777            } else if ch == '\\' {
1778                escaped = true;
1779            } else if ch == '"' {
1780                in_string = false;
1781            }
1782            index += 1;
1783            continue;
1784        }
1785
1786        if ch == '"' {
1787            in_string = true;
1788            output.push(ch);
1789            index += 1;
1790            continue;
1791        }
1792
1793        if ch == ',' {
1794            let mut next = index + 1;
1795            while next < chars.len() && chars[next].is_whitespace() {
1796                next += 1;
1797            }
1798            if next < chars.len() && matches!(chars[next], '}' | ']') {
1799                index += 1;
1800                continue;
1801            }
1802        }
1803
1804        output.push(ch);
1805        index += 1;
1806    }
1807
1808    output
1809}
1810
1811async fn send_route_bind_error(
1812    tx: &mpsc::Sender<Frame>,
1813    frame: &Frame,
1814    code: &str,
1815    message: &str,
1816) -> Result<(), SubcError> {
1817    send_route_bind_error_parts(
1818        tx,
1819        frame.header.ver,
1820        frame.header.corr,
1821        frame.header.flags,
1822        code,
1823        message,
1824    )
1825    .await
1826}
1827
1828async fn send_route_bind_error_parts(
1829    tx: &mpsc::Sender<Frame>,
1830    ver: u8,
1831    corr: u64,
1832    flags: Flags,
1833    code: &str,
1834    message: &str,
1835) -> Result<(), SubcError> {
1836    let response = build_error_frame(ver, 0, corr, flags, code, message)?;
1837    send_frame(tx, response).await?;
1838    log::warn!("subc attach: route bind rejected ({code}): {message}");
1839    Ok(())
1840}
1841
1842/// Route-channel tool call: `{name, arguments}` → executor lane → dispatch to
1843/// the sync command core → wrap the structured Response in a CallToolResult
1844/// `{content, isError}`. v1 mapping: the whole `{success, ...}` Response
1845/// serialized into ONE text block; `isError` carries `success == false`.
1846async fn handle_tool_call(
1847    tx: &mpsc::Sender<Frame>,
1848    frame: &Frame,
1849    routes: &HashMap<RouteChannel, RouteIdentity>,
1850    pending_binds: &HashMap<RouteChannel, PendingBind>,
1851    live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1852    executor: &Arc<Executor>,
1853    shutdown: &Arc<Notify>,
1854    dispatch: DispatchFn,
1855    allow_native_passthrough: bool,
1856) -> Result<(), SubcError> {
1857    let route_id = route_key(frame.header.channel);
1858    if pending_binds.contains_key(&route_id) {
1859        let error = build_error_frame(
1860            frame.header.ver,
1861            frame.header.channel,
1862            frame.header.corr,
1863            frame.header.flags,
1864            "route_not_bound",
1865            "route is not bound before tool call",
1866        )?;
1867        return send_frame(tx, error).await;
1868    }
1869
1870    let Some(identity) = routes.get(&route_id).cloned() else {
1871        let error = build_error_frame(
1872            frame.header.ver,
1873            frame.header.channel,
1874            frame.header.corr,
1875            frame.header.flags,
1876            "route_not_bound",
1877            "route is not bound before tool call",
1878        )?;
1879        return send_frame(tx, error).await;
1880    };
1881    if let Some(meta) = live_roots.get_mut(&identity.root) {
1882        meta.touch();
1883    }
1884
1885    let call = serde_json::from_slice::<ToolCallRequest>(&frame.body).map_err(SubcError::Json)?;
1886    let bare_name = call.name.clone();
1887    let format_context = crate::subc_format::FormatContext::from_tool_call(
1888        &bare_name,
1889        &call.arguments,
1890        identity.project_root.as_path(),
1891    );
1892
1893    let request_id = format!("subc-{}-{}", frame.header.channel, frame.header.corr);
1894    let project_root = identity.project_root.as_path();
1895    let diagnostics_on_edit = live_roots
1896        .get(&identity.root)
1897        .map(|meta| meta.diagnostics_on_edit)
1898        .unwrap_or(false);
1899    let translate_context = crate::subc_translate::TranslateContext {
1900        diagnostics_on_edit,
1901    };
1902    let (command, translated_args) = match crate::subc_translate::subc_translate_with_context(
1903        &call.name,
1904        &call.arguments,
1905        project_root,
1906        translate_context,
1907    ) {
1908        Ok(t) => (t.command, t.args),
1909        // A core agent tool that fails translation is a real client error
1910        // (e.g. `edit` with no resolvable mode) — surface it, never guess.
1911        Err(err) if is_subc_agent_core_tool(&call.name) => {
1912            let response = Response::error(request_id.clone(), err.code, err.message);
1913            let response_frame = build_tool_response_frame(
1914                frame.header.ver,
1915                frame.header.channel,
1916                frame.header.corr,
1917                frame.header.flags,
1918                &bare_name,
1919                &format_context,
1920                &response,
1921            )?;
1922            return send_frame(tx, response_frame).await;
1923        }
1924        // A non-core name is NOT in the tool manifest. AFT fails closed and
1925        // does not trust subc to enforce the manifest: rejecting here is the
1926        // defense-in-depth backstop that prevents a forwarded native command
1927        // (e.g. `configure`, which would reach handle_configure and bypass
1928        // the RouteBind config-trust cap) from ever reaching dispatch. Only
1929        // the integration-test harness (run_subc_mode_for_test) opens this to
1930        // drive synthetic native commands through the executor.
1931        Err(_) if !allow_native_passthrough => {
1932            log::warn!(
1933                "subc tool call: rejecting non-manifest tool name {:?} on route {} (fail-closed)",
1934                call.name,
1935                frame.header.channel
1936            );
1937            let response = Response::error(
1938                request_id.clone(),
1939                "unknown_tool",
1940                format!("tool {:?} is not in the AFT tool manifest", call.name),
1941            );
1942            let response_frame = build_tool_response_frame(
1943                frame.header.ver,
1944                frame.header.channel,
1945                frame.header.corr,
1946                frame.header.flags,
1947                &bare_name,
1948                &format_context,
1949                &response,
1950            )?;
1951            return send_frame(tx, response_frame).await;
1952        }
1953        // Test-only: non-core names are native commands (the integration-test
1954        // synthetic tools) — pass them through verbatim.
1955        Err(_) => {
1956            let map = call.arguments.as_object().cloned().unwrap_or_default();
1957            (call.name.clone(), map)
1958        }
1959    };
1960
1961    let lane = command_lane(&command);
1962    let command_for_finalize = command.clone();
1963    let session_for_finalize = identity.session.clone();
1964    let mut map = translated_args;
1965    map.insert("id".to_string(), json!(request_id.clone()));
1966    map.insert("command".to_string(), json!(command));
1967    map.insert("session_id".to_string(), json!(identity.session.clone()));
1968
1969    let raw_req = match serde_json::from_value::<RawRequest>(Value::Object(map)) {
1970        Ok(req) => req,
1971        Err(error) => {
1972            let response = Response::error(
1973                request_id.clone(),
1974                "invalid_request",
1975                format!("failed to build request from tool call: {error}"),
1976            );
1977            let response_frame = build_tool_response_frame(
1978                frame.header.ver,
1979                frame.header.channel,
1980                frame.header.corr,
1981                frame.header.flags,
1982                &bare_name,
1983                &format_context,
1984                &response,
1985            )?;
1986            return send_frame(tx, response_frame).await;
1987        }
1988    };
1989
1990    let bare_name_for_frame = bare_name.clone();
1991    let format_context_for_frame = format_context;
1992    let rx = executor.submit_async(
1993        identity.root,
1994        lane,
1995        request_id.clone(),
1996        Box::new(move |ctx| {
1997            log_ctx::with_session(Some(session_for_finalize.clone()), || {
1998                let mut response = dispatch(raw_req, ctx);
1999                crate::response_finalize::attach_bg_completions(
2000                    &mut response,
2001                    ctx,
2002                    &session_for_finalize,
2003                    &command_for_finalize,
2004                );
2005                crate::response_finalize::attach_status_bar(
2006                    &mut response,
2007                    ctx,
2008                    &command_for_finalize,
2009                );
2010                response
2011            })
2012        }),
2013    );
2014    let completion_tx = tx.clone();
2015    let completion_shutdown = Arc::clone(shutdown);
2016    let route_channel = frame.header.channel;
2017    let corr = frame.header.corr;
2018    let flags = frame.header.flags;
2019    let ver = frame.header.ver;
2020    let is_mutating = lane == Lane::Mutating;
2021    tokio::spawn(async move {
2022        let response = await_executor_response(rx, request_id.clone()).await;
2023        let fatal = is_mutating && response_is_internal_error(&response);
2024        match build_tool_response_frame(
2025            ver,
2026            route_channel,
2027            corr,
2028            flags,
2029            &bare_name_for_frame,
2030            &format_context_for_frame,
2031            &response,
2032        ) {
2033            Ok(response_frame) => {
2034                let _ = completion_tx.send(response_frame).await;
2035            }
2036            Err(error) => {
2037                log::error!("subc attach: failed to build tool response frame: {error}");
2038            }
2039        }
2040        if fatal {
2041            signal_fatal_teardown(
2042                &completion_tx,
2043                Some(route_channel),
2044                ver,
2045                corr,
2046                &completion_shutdown,
2047            )
2048            .await;
2049        }
2050    });
2051    Ok(())
2052}
2053
2054fn submit_maintenance_drain(
2055    executor: &Arc<Executor>,
2056    root_id: ProjectRootId,
2057    completion_tx: &mpsc::Sender<(ProjectRootId, Response)>,
2058) {
2059    let request_id = format!(
2060        "subc-maintenance-drain-{}",
2061        root_id.as_path().to_string_lossy()
2062    );
2063    let response_id = request_id.clone();
2064    let completion_root_id = root_id.clone();
2065    let rx = executor.submit_async(
2066        root_id,
2067        Lane::Mutating,
2068        request_id.clone(),
2069        Box::new(move |ctx| {
2070            runtime_drain::drain_configure_warning_events(ctx);
2071            runtime_drain::drain_search_index_events(ctx);
2072            runtime_drain::drain_callgraph_store_events(ctx);
2073            runtime_drain::drain_semantic_index_events(ctx);
2074            runtime_drain::drain_semantic_refresh_events(ctx);
2075            runtime_drain::drain_inspect_events(ctx);
2076            runtime_drain::drain_watcher_events(ctx);
2077            runtime_drain::drain_lsp_events(ctx);
2078            Response::success(response_id, json!({ "drained": true }))
2079        }),
2080    );
2081    let completion_tx = completion_tx.clone();
2082    tokio::spawn(async move {
2083        let response = await_executor_response(rx, request_id).await;
2084        let _ = completion_tx.send((completion_root_id, response)).await;
2085    });
2086}
2087
2088async fn await_executor_response(rx: oneshot::Receiver<Response>, request_id: String) -> Response {
2089    rx.await
2090        .unwrap_or_else(|_| Response::error(request_id, "internal_error", "executor dropped"))
2091}
2092
2093fn build_tool_response_frame(
2094    ver: u8,
2095    route_channel: u16,
2096    corr: u64,
2097    flags: Flags,
2098    bare_name: &str,
2099    format_context: &crate::subc_format::FormatContext,
2100    response: &Response,
2101) -> Result<Frame, SubcError> {
2102    let text =
2103        crate::subc_format::format_response_with_context(bare_name, response, format_context);
2104    let is_error = !response.success;
2105    let result = json!({
2106        "content": [{ "type": "text", "text": text }],
2107        "isError": is_error,
2108    });
2109    let body = serde_json::to_vec(&result).map_err(SubcError::Json)?;
2110
2111    Frame::build_with_version(ver, FrameType::Response, flags, route_channel, corr, body)
2112        .map_err(SubcError::FrameBuild)
2113}
2114
2115fn build_error_frame(
2116    ver: u8,
2117    channel: u16,
2118    corr: u64,
2119    flags: Flags,
2120    code: &str,
2121    message: &str,
2122) -> Result<Frame, SubcError> {
2123    let body = serde_json::to_vec(&ErrorBody {
2124        code: code.to_string(),
2125        message: message.to_string(),
2126    })
2127    .map_err(SubcError::Json)?;
2128    Frame::build_with_version(ver, FrameType::Error, flags, channel, corr, body)
2129        .map_err(SubcError::FrameBuild)
2130}
2131
2132fn build_goodbye_frame(ver: u8, channel: u16, corr: u64) -> Result<Frame, SubcError> {
2133    Frame::build_with_version(
2134        ver,
2135        FrameType::Goodbye,
2136        control_flags(),
2137        channel,
2138        corr,
2139        Vec::new(),
2140    )
2141    .map_err(SubcError::FrameBuild)
2142}
2143
2144async fn signal_fatal_teardown(
2145    tx: &mpsc::Sender<Frame>,
2146    route_channel: Option<u16>,
2147    ver: u8,
2148    corr: u64,
2149    shutdown: &Arc<Notify>,
2150) {
2151    if let Some(route_channel) = route_channel {
2152        if let Ok(frame) = build_goodbye_frame(ver, route_channel, corr) {
2153            if let Err(error) = send_frame(tx, frame).await {
2154                log::warn!(
2155                    "subc attach: failed to queue fatal route Goodbye for route {route_channel}: {error}"
2156                );
2157            }
2158        }
2159    }
2160    if let Ok(frame) = build_goodbye_frame(ver, 0, 0) {
2161        if let Err(error) = send_frame(tx, frame).await {
2162            log::warn!("subc attach: failed to queue fatal channel-0 Goodbye: {error}");
2163        }
2164    }
2165    shutdown.notify_one();
2166}
2167
2168fn response_message(response: &Response, fallback: &str) -> String {
2169    response
2170        .data
2171        .get("message")
2172        .and_then(Value::as_str)
2173        .map(ToOwned::to_owned)
2174        .unwrap_or_else(|| fallback.to_string())
2175}
2176
2177fn response_is_internal_error(response: &Response) -> bool {
2178    !response.success && response.data.get("code").and_then(Value::as_str) == Some("internal_error")
2179}
2180
2181fn is_subc_agent_core_tool(name: &str) -> bool {
2182    matches!(
2183        name,
2184        "status" | "read" | "write" | "edit" | "grep" | "search" | "outline" | "inspect"
2185    )
2186}
2187
2188fn command_lane(command: &str) -> Lane {
2189    match command {
2190        "ping"
2191        | "version"
2192        | "echo"
2193        | "bash_drain_completions"
2194        | "bash_regex_match"
2195        | "db_get_state"
2196        | "db_get_host_state"
2197        | "read"
2198        | "undo_preview"
2199        | "edit_history"
2200        | "checkpoint_paths"
2201        | "list_checkpoints"
2202        | "glob"
2203        | "grep"
2204        | "git_conflicts"
2205        | "ast_search" => Lane::PureRead,
2206
2207        // Lazy reads mutate parser/terminal/url caches on a miss, but are still
2208        // classified onto the reader pool; install races are handled at the
2209        // individual cache sites.
2210        "bash_status" | "outline" | "zoom" => Lane::PureRead,
2211
2212        "status"
2213        | "inspect"
2214        | "lsp_diagnostics"
2215        | "lsp_inspect"
2216        | "lsp_hover"
2217        | "lsp_goto_definition"
2218        | "lsp_find_references"
2219        | "lsp_prepare_rename" => Lane::SerialLspStatus,
2220
2221        "semantic_search" | "search" | "callers" | "impact" | "call_tree" | "trace_to"
2222        | "trace_to_symbol" | "trace_data" | "inspect_tier2_run" => Lane::HeavyInit,
2223
2224        "bash"
2225        | "bash_ack_completions"
2226        | "bash_notify"
2227        | "bash_unnotify"
2228        | "bash_promote"
2229        | "bash_kill"
2230        | "bash_write"
2231        | "db_set_state"
2232        | "db_set_host_state"
2233        | "undo"
2234        | "checkpoint"
2235        | "restore_checkpoint"
2236        | "write"
2237        | "delete_file"
2238        | "move_file"
2239        | "edit"
2240        | "edit_symbol"
2241        | "edit_match"
2242        | "batch"
2243        | "add_import"
2244        | "remove_import"
2245        | "organize_imports"
2246        | "configure"
2247        | "move_symbol"
2248        | "extract_function"
2249        | "inline_symbol"
2250        | "ast_replace"
2251        | "lsp_rename"
2252        | "list_filters"
2253        | "trust_filter_project"
2254        | "untrust_filter_project"
2255        | "snapshot" => Lane::Mutating,
2256
2257        _ => Lane::Mutating,
2258    }
2259}
2260
2261#[derive(Debug, Deserialize)]
2262struct ToolCallRequest {
2263    name: String,
2264    #[serde(default)]
2265    arguments: Value,
2266}
2267
2268static SUBC_TOOL_SCHEMAS: LazyLock<serde_json::Map<String, Value>> = LazyLock::new(|| {
2269    serde_json::from_str(include_str!("subc_tool_schemas.json"))
2270        .unwrap_or_else(|e| panic!("subc_tool_schemas.json: {e}"))
2271});
2272
2273fn tool_schema(name: &str) -> Value {
2274    SUBC_TOOL_SCHEMAS.get(name).cloned().unwrap_or_else(|| {
2275        log::warn!(
2276            "subc build_manifest: missing embedded schema for tool {name:?}; using placeholder"
2277        );
2278        json!({ "type": "object" })
2279    })
2280}
2281
2282/// AFT's subc-mode capability manifest. BARE tool names (the gateway owns the
2283/// `aft_` prefix); ModuleManaged concurrency (AFT schedules internally);
2284/// FirstParty trust. Minimal-but-conformant tool set for the spike — the full
2285/// bare set is locked before the gateway fronts AFT.
2286fn build_manifest() -> ModuleManifest {
2287    let tool = |name: &str, execution_mode: ExecutionMode| Tool {
2288        name: name.to_string(),
2289        execution_mode,
2290        schema: tool_schema(name),
2291    };
2292    // execution_mode keys on externally-observable side effects, NOT internal
2293    // ctx mutation: the readers warm AFT's own index/cache/symbol artifacts
2294    // (internal), not the user's workspace, so they are Pure. Only edit/write
2295    // produce observable file writes -> Mutating. There is no bash tool in this
2296    // core, so Unfenceable is unused here (it stays in the enum for bash/other
2297    // modules).
2298    ModuleManifest {
2299        module_id: "aft".to_string(),
2300        module_version: env!("CARGO_PKG_VERSION").to_string(),
2301        protocol_ver: PROTOCOL_VERSION,
2302        trust_tier: TrustTier::FirstParty,
2303        provides: vec![ProviderRole::ToolProvider {
2304            tools: vec![
2305                tool("status", ExecutionMode::Pure),
2306                tool("read", ExecutionMode::Pure),
2307                tool("grep", ExecutionMode::Pure),
2308                tool("search", ExecutionMode::Pure),
2309                tool("outline", ExecutionMode::Pure),
2310                tool("inspect", ExecutionMode::Pure),
2311                tool("edit", ExecutionMode::Mutating),
2312                tool("write", ExecutionMode::Mutating),
2313            ],
2314            identity_scope: vec![IdentityScope::Session, IdentityScope::Project],
2315            concurrency: Concurrency::ModuleManaged,
2316            emits_push: true,
2317            sub_supervises: true,
2318        }],
2319        consumes: Vec::new(),
2320        scheduled_tasks: Vec::new(),
2321        bindings: Bindings {
2322            storage: StorageBinding {
2323                kind: StorageKind::Sqlite,
2324                scope: StorageScope::Project,
2325                owns_schema: true,
2326            },
2327            vault_grants: Vec::new(),
2328            identity: IdentityBinding {
2329                requires: vec![IdentityScope::Project],
2330                optional: vec![IdentityScope::Session],
2331            },
2332        },
2333    }
2334}
2335
2336fn control_flags() -> Flags {
2337    Flags::new(false, Priority::Passive, false)
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342    use super::*;
2343    use crate::bash_background::BgTaskStatus;
2344    use crate::protocol::{
2345        BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, ConfigureWarningsFrame,
2346        ProgressFrame, StatusChangedFrame,
2347    };
2348    use serde_json::json;
2349
2350    fn test_root(name: &str) -> (tempfile::TempDir, ProjectRootId) {
2351        let dir = tempfile::Builder::new()
2352            .prefix(name)
2353            .tempdir()
2354            .expect("temp root");
2355        let root = ProjectRootId::from_path(dir.path()).expect("project root id");
2356        (dir, root)
2357    }
2358
2359    fn status_frame(seq: u64) -> PushFrame {
2360        status_frame_with_session(seq, None)
2361    }
2362
2363    fn status_frame_with_session(seq: u64, session_id: Option<&str>) -> PushFrame {
2364        PushFrame::StatusChanged(StatusChangedFrame {
2365            frame_type: "status_changed",
2366            session_id: session_id.map(str::to_string),
2367            snapshot: json!({ "seq": seq }),
2368        })
2369    }
2370
2371    fn completion_frame(task_id: &str) -> PushFrame {
2372        completion_frame_with_session(task_id, "session-1")
2373    }
2374
2375    fn completion_frame_with_session(task_id: &str, session_id: &str) -> PushFrame {
2376        PushFrame::BashCompleted(BashCompletedFrame {
2377            frame_type: "bash_completed",
2378            task_id: task_id.to_string(),
2379            session_id: session_id.to_string(),
2380            status: BgTaskStatus::Completed,
2381            exit_code: Some(0),
2382            command: format!("echo {task_id}"),
2383            output_preview: String::new(),
2384            output_truncated: false,
2385            original_tokens: None,
2386            compressed_tokens: None,
2387            tokens_skipped: false,
2388        })
2389    }
2390
2391    fn long_running_frame(task_id: &str, elapsed_ms: u64) -> PushFrame {
2392        long_running_frame_with_session(task_id, "session-1", elapsed_ms)
2393    }
2394
2395    fn long_running_frame_with_session(
2396        task_id: &str,
2397        session_id: &str,
2398        elapsed_ms: u64,
2399    ) -> PushFrame {
2400        PushFrame::BashLongRunning(BashLongRunningFrame {
2401            frame_type: "bash_long_running",
2402            task_id: task_id.to_string(),
2403            session_id: session_id.to_string(),
2404            command: format!("sleep {elapsed_ms}"),
2405            elapsed_ms,
2406        })
2407    }
2408
2409    fn pattern_match_frame(session_id: &str) -> PushFrame {
2410        PushFrame::BashPatternMatch(BashPatternMatchFrame {
2411            frame_type: "bash_pattern_match",
2412            task_id: "task-pattern".to_string(),
2413            session_id: session_id.to_string(),
2414            watch_id: "watch-1".to_string(),
2415            match_text: "needle".to_string(),
2416            match_offset: 7,
2417            context: "haystack needle".to_string(),
2418            once: true,
2419            reason: "pattern_match",
2420        })
2421    }
2422
2423    fn configure_warnings_frame(session_id: Option<&str>) -> PushFrame {
2424        PushFrame::ConfigureWarnings(ConfigureWarningsFrame {
2425            frame_type: "configure_warnings",
2426            session_id: session_id.map(str::to_string),
2427            project_root: "/tmp/subc-test".to_string(),
2428            source_file_count: 0,
2429            warnings: Vec::new(),
2430        })
2431    }
2432
2433    fn route_identity(root: &ProjectRootId, session_id: &str) -> RouteIdentity {
2434        RouteIdentity {
2435            root: root.clone(),
2436            project_root: root.as_path().to_path_buf(),
2437            harness: "opencode".to_string(),
2438            session: session_id.to_string(),
2439        }
2440    }
2441
2442    fn progress_frame(request_id: &str, kind: ProgressKind, chunk: &str) -> PushFrame {
2443        PushFrame::Progress(ProgressFrame::new(request_id, kind, chunk))
2444    }
2445
2446    fn status_seq(frame: &PushFrame) -> Option<u64> {
2447        match frame {
2448            PushFrame::StatusChanged(status) => status.snapshot.get("seq").and_then(|v| v.as_u64()),
2449            _ => None,
2450        }
2451    }
2452
2453    fn completion_task(frame: &PushFrame) -> Option<&str> {
2454        match frame {
2455            PushFrame::BashCompleted(completion) => Some(completion.task_id.as_str()),
2456            _ => None,
2457        }
2458    }
2459
2460    fn push_frame_task_id(frame: &Frame) -> Option<String> {
2461        let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2462        body.get("task_id")
2463            .and_then(serde_json::Value::as_str)
2464            .map(str::to_string)
2465    }
2466
2467    #[test]
2468    fn frame_classification_matches_push_delivery_contract() {
2469        let completion = completion_frame_with_session("done", "session-a");
2470        assert_eq!(frame_session(&completion), Some("session-a"));
2471        assert!(frame_is_reliable(&completion));
2472
2473        let long_running = long_running_frame_with_session("long", "session-b", 42);
2474        assert_eq!(frame_session(&long_running), Some("session-b"));
2475        assert!(!frame_is_reliable(&long_running));
2476
2477        let pattern_match = pattern_match_frame("session-c");
2478        assert_eq!(frame_session(&pattern_match), Some("session-c"));
2479        assert!(frame_is_reliable(&pattern_match));
2480
2481        let tagged_warnings = configure_warnings_frame(Some("session-d"));
2482        assert_eq!(frame_session(&tagged_warnings), Some("session-d"));
2483        assert!(frame_is_reliable(&tagged_warnings));
2484
2485        let untagged_warnings = configure_warnings_frame(None);
2486        assert_eq!(frame_session(&untagged_warnings), None);
2487        assert!(frame_is_reliable(&untagged_warnings));
2488
2489        let tagged_status = status_frame_with_session(1, Some("session-e"));
2490        assert_eq!(frame_session(&tagged_status), Some("session-e"));
2491        assert!(!frame_is_reliable(&tagged_status));
2492
2493        let project_status = status_frame(2);
2494        assert_eq!(frame_session(&project_status), None);
2495        assert!(!frame_is_reliable(&project_status));
2496
2497        let progress = progress_frame("request-1", ProgressKind::Stdout, "chunk");
2498        assert_eq!(frame_session(&progress), None);
2499        assert!(!frame_is_reliable(&progress));
2500    }
2501
2502    #[test]
2503    fn fan_out_push_frame_routes_session_scoped_and_project_scoped_frames() {
2504        let (_root_dir, root) = test_root("subc-session-routing-root");
2505        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(8);
2506        let identity1 = route_identity(&root, "session-1");
2507        let identity2 = route_identity(&root, "session-2");
2508        let mut routes = HashMap::new();
2509        routes.insert(route_key(1), identity1.clone());
2510        routes.insert(route_key(2), identity2.clone());
2511        let mut root_channels = HashMap::new();
2512        root_channels.insert(root.clone(), HashSet::from([route_key(1), route_key(2)]));
2513        let mut session_identity = HashMap::new();
2514        remember_session_identity(&mut session_identity, &identity1);
2515        remember_session_identity(&mut session_identity, &identity2);
2516        let mut retry_buffer = HashMap::new();
2517        let mut push_buffer = HashMap::new();
2518
2519        let session_result = fan_out_reliable_push_frame(
2520            &writer_tx,
2521            &routes,
2522            &root_channels,
2523            &session_identity,
2524            &mut retry_buffer,
2525            &mut push_buffer,
2526            &root,
2527            &completion_frame_with_session("session-only", "session-1"),
2528        );
2529        assert_eq!(
2530            session_result,
2531            FanOutResult {
2532                matched_channels: 1,
2533                sent_frames: 1,
2534            }
2535        );
2536        assert!(retry_buffer.is_empty());
2537        assert!(push_buffer.is_empty());
2538        let session_push = writer_rx.try_recv().expect("session push queued");
2539        assert_eq!(session_push.header.ty, FrameType::Push);
2540        assert_eq!(session_push.header.channel, 1);
2541        assert!(
2542            writer_rx.try_recv().is_err(),
2543            "session-scoped frame must not broadcast to sibling sessions"
2544        );
2545
2546        let project_result =
2547            fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(9));
2548        assert_eq!(
2549            project_result,
2550            FanOutResult {
2551                matched_channels: 2,
2552                sent_frames: 2,
2553            }
2554        );
2555        let project_channels: HashSet<_> = [
2556            writer_rx
2557                .try_recv()
2558                .expect("first project push")
2559                .header
2560                .channel,
2561            writer_rx
2562                .try_recv()
2563                .expect("second project push")
2564                .header
2565                .channel,
2566        ]
2567        .into_iter()
2568        .collect();
2569        assert_eq!(project_channels, HashSet::from([1, 2]));
2570        assert!(writer_rx.try_recv().is_err());
2571    }
2572
2573    #[test]
2574    fn push_buffer_drops_oldest_per_replay_key() {
2575        let (_root_dir, root) = test_root("subc-buffer-bound-root");
2576        let key = ReplayKey {
2577            root,
2578            harness: "opencode".to_string(),
2579            session: "session-1".to_string(),
2580        };
2581        let mut push_buffer = HashMap::new();
2582        let total = PUSH_BUFFER_MAX_PER_KEY + 3;
2583
2584        for index in 0..total {
2585            buffer_push_frame(
2586                &mut push_buffer,
2587                key.clone(),
2588                completion_frame(&format!("task-{index}")),
2589            );
2590        }
2591
2592        let buffered = push_buffer.get(&key).expect("buffer entry");
2593        assert_eq!(buffered.len(), PUSH_BUFFER_MAX_PER_KEY);
2594        let tasks: Vec<String> = buffered
2595            .iter()
2596            .filter_map(completion_task)
2597            .map(str::to_string)
2598            .collect();
2599        assert_eq!(tasks.first().map(String::as_str), Some("task-3"));
2600        assert_eq!(
2601            tasks.last().map(String::as_str),
2602            Some(format!("task-{}", total - 1).as_str())
2603        );
2604    }
2605
2606    #[test]
2607    fn replay_buffered_push_frames_drains_to_bound_channel() {
2608        let (_root_dir, root) = test_root("subc-buffer-replay-root");
2609        let key = ReplayKey {
2610            root,
2611            harness: "opencode".to_string(),
2612            session: "session-1".to_string(),
2613        };
2614        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
2615        let mut push_buffer = HashMap::new();
2616        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
2617        buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-b"));
2618
2619        let replayed =
2620            replay_buffered_push_frames(&writer_tx, route_key(3), &mut push_buffer, &key);
2621
2622        assert_eq!(replayed, 2);
2623        assert!(!push_buffer.contains_key(&key));
2624        for expected_task in ["task-a", "task-b"] {
2625            let frame = writer_rx.try_recv().expect("replayed push");
2626            assert_eq!(frame.header.ty, FrameType::Push);
2627            assert_eq!(frame.header.channel, 3);
2628            let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2629            assert_eq!(body["task_id"].as_str(), Some(expected_task));
2630        }
2631        assert!(writer_rx.try_recv().is_err());
2632    }
2633
2634    #[test]
2635    fn coalesce_push_batch_collapses_lossy_and_preserves_reliable_fifo() {
2636        let (_root_dir, root) = test_root("subc-coalesce-root");
2637        let (_other_dir, other_root) = test_root("subc-coalesce-other");
2638
2639        let output = coalesce_push_batch(vec![
2640            (root.clone(), status_frame(1)),
2641            (root.clone(), completion_frame("task-1")),
2642            (root.clone(), status_frame(2)),
2643            (root.clone(), completion_frame("task-2")),
2644            (root.clone(), long_running_frame("long-task", 100)),
2645            (root.clone(), long_running_frame("long-task", 200)),
2646            (other_root.clone(), status_frame(9)),
2647        ]);
2648
2649        let completion_tasks: Vec<_> = output
2650            .iter()
2651            .filter_map(|(_, frame)| completion_task(frame))
2652            .collect();
2653        assert_eq!(completion_tasks, vec!["task-1", "task-2"]);
2654
2655        let root_statuses: Vec<_> = output
2656            .iter()
2657            .filter(|(output_root, _)| output_root == &root)
2658            .filter_map(|(_, frame)| status_seq(frame))
2659            .collect();
2660        assert_eq!(root_statuses, vec![2]);
2661
2662        let other_statuses: Vec<_> = output
2663            .iter()
2664            .filter(|(output_root, _)| output_root == &other_root)
2665            .filter_map(|(_, frame)| status_seq(frame))
2666            .collect();
2667        assert_eq!(other_statuses, vec![9]);
2668
2669        let long_running_elapsed: Vec<_> = output
2670            .iter()
2671            .filter_map(|(_, frame)| match frame {
2672                PushFrame::BashLongRunning(long_running) => Some(long_running.elapsed_ms),
2673                _ => None,
2674            })
2675            .collect();
2676        assert_eq!(long_running_elapsed, vec![200]);
2677    }
2678
2679    #[test]
2680    fn coalesce_push_batch_keeps_progress_stream_keys_separate() {
2681        let (_root_dir, root) = test_root("subc-progress-coalesce-root");
2682
2683        let output = coalesce_push_batch(vec![
2684            (
2685                root.clone(),
2686                progress_frame("request-1", ProgressKind::Stdout, "old stdout"),
2687            ),
2688            (
2689                root.clone(),
2690                progress_frame("request-1", ProgressKind::Stderr, "stderr"),
2691            ),
2692            (
2693                root.clone(),
2694                progress_frame("request-2", ProgressKind::Stdout, "other stdout"),
2695            ),
2696            (
2697                root.clone(),
2698                progress_frame("request-1", ProgressKind::Stdout, "new stdout"),
2699            ),
2700        ]);
2701
2702        let progress: Vec<_> = output
2703            .iter()
2704            .filter_map(|(_, frame)| match frame {
2705                PushFrame::Progress(progress) => Some((
2706                    progress.request_id.as_str(),
2707                    match progress.kind {
2708                        ProgressKind::Stdout => "stdout",
2709                        ProgressKind::Stderr => "stderr",
2710                    },
2711                    progress.chunk.as_str(),
2712                )),
2713                _ => None,
2714            })
2715            .collect();
2716
2717        assert_eq!(
2718            progress,
2719            vec![
2720                ("request-1", "stderr", "stderr"),
2721                ("request-2", "stdout", "other stdout"),
2722                ("request-1", "stdout", "new stdout"),
2723            ]
2724        );
2725    }
2726
2727    #[test]
2728    fn progress_sender_keeps_reliable_off_saturated_lossy_funnel_without_blocking() {
2729        let (_root_dir, root) = test_root("subc-push-full-root");
2730        let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1);
2731        let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
2732        let sender = progress_sender_for_root(
2733            PushSenders {
2734                lossy_tx,
2735                reliable_tx,
2736            },
2737            root.clone(),
2738        );
2739
2740        let started = Instant::now();
2741        sender(status_frame(1));
2742        sender(status_frame(2));
2743        sender(completion_frame("reliable-after-lossy-full"));
2744        assert!(
2745            started.elapsed() < Duration::from_millis(50),
2746            "saturated push sender must return immediately"
2747        );
2748
2749        let (received_root, received_frame) =
2750            lossy_rx.try_recv().expect("first lossy frame queued");
2751        assert_eq!(received_root, root);
2752        assert_eq!(status_seq(&received_frame), Some(1));
2753        assert!(
2754            lossy_rx.try_recv().is_err(),
2755            "second lossy frame should be dropped"
2756        );
2757
2758        let (reliable_root, reliable_frame) = reliable_rx
2759            .try_recv()
2760            .expect("reliable frame bypasses lossy backpressure");
2761        assert_eq!(reliable_root, root);
2762        assert_eq!(
2763            completion_task(&reliable_frame),
2764            Some("reliable-after-lossy-full")
2765        );
2766        assert!(reliable_rx.try_recv().is_err());
2767    }
2768
2769    #[test]
2770    fn fan_out_lossy_push_frame_drops_when_writer_is_full_without_blocking() {
2771        let (_root_dir, root) = test_root("subc-writer-full-root");
2772        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2773        writer_tx
2774            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2775            .expect("prefill writer queue");
2776
2777        let mut root_channels = HashMap::new();
2778        root_channels.insert(root.clone(), HashSet::from([route_key(7)]));
2779
2780        let routes = HashMap::new();
2781        let started = Instant::now();
2782        let result =
2783            fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(1));
2784        assert!(
2785            started.elapsed() < Duration::from_millis(50),
2786            "saturated writer fan-out must return immediately"
2787        );
2788        assert_eq!(
2789            result,
2790            FanOutResult {
2791                matched_channels: 1,
2792                sent_frames: 0,
2793            }
2794        );
2795
2796        let queued = writer_rx
2797            .try_recv()
2798            .expect("prefilled frame remains queued");
2799        assert_eq!(queued.header.ty, FrameType::Ping);
2800        assert!(
2801            writer_rx.try_recv().is_err(),
2802            "push should be dropped on full writer"
2803        );
2804    }
2805
2806    #[test]
2807    fn reliable_push_backpressure_buffers_and_retries_on_tick() {
2808        let (_root_dir, root) = test_root("subc-retry-buffer-root");
2809        let identity = route_identity(&root, "session-1");
2810        let key = ReplayKey::from_identity(&identity);
2811        let mut routes = HashMap::new();
2812        routes.insert(route_key(9), identity.clone());
2813        let mut root_channels = HashMap::new();
2814        root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2815        let mut session_identity = HashMap::new();
2816        remember_session_identity(&mut session_identity, &identity);
2817        let mut retry_buffer = HashMap::new();
2818        let mut push_buffer = HashMap::new();
2819        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2820        writer_tx
2821            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2822            .expect("prefill writer queue");
2823
2824        let result = fan_out_reliable_push_frame(
2825            &writer_tx,
2826            &routes,
2827            &root_channels,
2828            &session_identity,
2829            &mut retry_buffer,
2830            &mut push_buffer,
2831            &root,
2832            &completion_frame("retry-task"),
2833        );
2834
2835        assert_eq!(
2836            result,
2837            FanOutResult {
2838                matched_channels: 1,
2839                sent_frames: 0,
2840            }
2841        );
2842        assert!(push_buffer.is_empty());
2843        assert_eq!(retry_buffer.get(&route_key(9)).map(VecDeque::len), Some(1));
2844        assert_eq!(&retry_buffer[&route_key(9)][0].0, &key);
2845
2846        let queued = writer_rx.try_recv().expect("prefilled frame");
2847        assert_eq!(queued.header.ty, FrameType::Ping);
2848        assert_eq!(
2849            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2850            1
2851        );
2852        let retried = writer_rx.try_recv().expect("retried reliable push");
2853        assert_eq!(retried.header.ty, FrameType::Push);
2854        assert_eq!(retried.header.channel, 9);
2855        assert_eq!(push_frame_task_id(&retried).as_deref(), Some("retry-task"));
2856        assert!(!retry_buffer.contains_key(&route_key(9)));
2857    }
2858
2859    #[test]
2860    fn reliable_push_fifo_gates_new_frames_behind_retry_buffer() {
2861        let (_root_dir, root) = test_root("subc-retry-fifo-root");
2862        let identity = route_identity(&root, "session-1");
2863        let mut routes = HashMap::new();
2864        routes.insert(route_key(9), identity.clone());
2865        let mut root_channels = HashMap::new();
2866        root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2867        let mut session_identity = HashMap::new();
2868        remember_session_identity(&mut session_identity, &identity);
2869        let mut retry_buffer = HashMap::new();
2870        let mut push_buffer = HashMap::new();
2871        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2872        writer_tx
2873            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2874            .expect("prefill writer queue");
2875
2876        let first = completion_frame("fifo-1");
2877        let second = completion_frame("fifo-2");
2878        let _ = fan_out_reliable_push_frame(
2879            &writer_tx,
2880            &routes,
2881            &root_channels,
2882            &session_identity,
2883            &mut retry_buffer,
2884            &mut push_buffer,
2885            &root,
2886            &first,
2887        );
2888        let queued = writer_rx.try_recv().expect("free writer capacity");
2889        assert_eq!(queued.header.ty, FrameType::Ping);
2890
2891        let _ = fan_out_reliable_push_frame(
2892            &writer_tx,
2893            &routes,
2894            &root_channels,
2895            &session_identity,
2896            &mut retry_buffer,
2897            &mut push_buffer,
2898            &root,
2899            &second,
2900        );
2901        assert!(
2902            writer_rx.try_recv().is_err(),
2903            "second reliable frame must not bypass pending retry frame"
2904        );
2905        let queued_tasks: Vec<_> = retry_buffer[&route_key(9)]
2906            .iter()
2907            .filter_map(|(_, frame)| completion_task(frame))
2908            .collect();
2909        assert_eq!(queued_tasks, vec!["fifo-1", "fifo-2"]);
2910
2911        assert_eq!(
2912            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2913            1
2914        );
2915        let first_sent = writer_rx.try_recv().expect("first reliable push");
2916        assert_eq!(push_frame_task_id(&first_sent).as_deref(), Some("fifo-1"));
2917        assert_eq!(
2918            drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2919            1
2920        );
2921        let second_sent = writer_rx.try_recv().expect("second reliable push");
2922        assert_eq!(push_frame_task_id(&second_sent).as_deref(), Some("fifo-2"));
2923        assert!(!retry_buffer.contains_key(&route_key(9)));
2924    }
2925
2926    #[test]
2927    fn replay_buffered_push_frames_drains_incrementally_on_backpressure() {
2928        let (_root_dir, root) = test_root("subc-incremental-replay-root");
2929        let key = ReplayKey {
2930            root,
2931            harness: "opencode".to_string(),
2932            session: "session-1".to_string(),
2933        };
2934        let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(2);
2935        writer_tx
2936            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2937            .expect("prefill writer queue");
2938        let mut push_buffer = HashMap::new();
2939        for task in ["replay-1", "replay-2", "replay-3"] {
2940            buffer_push_frame(&mut push_buffer, key.clone(), completion_frame(task));
2941        }
2942
2943        assert_eq!(
2944            replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2945            1
2946        );
2947        assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(2));
2948        let remaining: Vec<_> = push_buffer[&key]
2949            .iter()
2950            .filter_map(completion_task)
2951            .collect();
2952        assert_eq!(remaining, vec!["replay-2", "replay-3"]);
2953
2954        let queued = writer_rx.try_recv().expect("prefilled frame");
2955        assert_eq!(queued.header.ty, FrameType::Ping);
2956        let first = writer_rx.try_recv().expect("first replayed push");
2957        assert_eq!(push_frame_task_id(&first).as_deref(), Some("replay-1"));
2958
2959        assert_eq!(
2960            replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2961            2
2962        );
2963        let second = writer_rx.try_recv().expect("second replayed push");
2964        let third = writer_rx.try_recv().expect("third replayed push");
2965        assert_eq!(push_frame_task_id(&second).as_deref(), Some("replay-2"));
2966        assert_eq!(push_frame_task_id(&third).as_deref(), Some("replay-3"));
2967        assert!(!push_buffer.contains_key(&key));
2968    }
2969
2970    #[test]
2971    fn goodbye_migrates_retry_buffer_into_detach_replay() {
2972        let (_root_dir, root) = test_root("subc-goodbye-migration-root");
2973        let key = ReplayKey {
2974            root,
2975            harness: "opencode".to_string(),
2976            session: "session-1".to_string(),
2977        };
2978        let mut retry_buffer = HashMap::new();
2979        buffer_retry_frame(
2980            &mut retry_buffer,
2981            route_key(5),
2982            key.clone(),
2983            completion_frame("migrated-task"),
2984        );
2985        let mut push_buffer = HashMap::new();
2986
2987        assert_eq!(
2988            migrate_retry_buffer_to_push_buffer(&mut retry_buffer, route_key(5), &mut push_buffer),
2989            1
2990        );
2991
2992        assert!(!retry_buffer.contains_key(&route_key(5)));
2993        assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(1));
2994        assert_eq!(
2995            completion_task(&push_buffer[&key][0]),
2996            Some("migrated-task")
2997        );
2998    }
2999
3000    #[test]
3001    fn permanent_push_send_failure_is_dropped_not_retried_forever() {
3002        let (_root_dir, root) = test_root("subc-permanent-failure-root");
3003        let key = ReplayKey {
3004            root,
3005            harness: "opencode".to_string(),
3006            session: "session-1".to_string(),
3007        };
3008        let (writer_tx, writer_rx) = mpsc::channel::<Frame>(1);
3009        drop(writer_rx);
3010
3011        let mut push_buffer = HashMap::new();
3012        buffer_push_frame(
3013            &mut push_buffer,
3014            key.clone(),
3015            completion_frame("closed-replay"),
3016        );
3017        assert_eq!(
3018            replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
3019            0
3020        );
3021        assert!(!push_buffer.contains_key(&key));
3022
3023        let mut retry_buffer = HashMap::new();
3024        buffer_retry_frame(
3025            &mut retry_buffer,
3026            route_key(4),
3027            key,
3028            completion_frame("closed-retry"),
3029        );
3030        assert_eq!(
3031            drain_retry_buffer_for_channel(&writer_tx, route_key(4), &mut retry_buffer),
3032            0
3033        );
3034        assert!(!retry_buffer.contains_key(&route_key(4)));
3035    }
3036
3037    #[test]
3038    fn completed_task_suppresses_stale_long_running_lossy_push() {
3039        let mut completed_tasks = CompletedTaskIds::default();
3040        assert!(!should_drop_lossy_push(
3041            &completed_tasks,
3042            &long_running_frame("stale-task", 100)
3043        ));
3044
3045        completed_tasks.remember("stale-task");
3046
3047        assert!(should_drop_lossy_push(
3048            &completed_tasks,
3049            &long_running_frame("stale-task", 200)
3050        ));
3051        assert!(!should_drop_lossy_push(
3052            &completed_tasks,
3053            &long_running_frame("other-task", 200)
3054        ));
3055    }
3056
3057    #[tokio::test]
3058    async fn control_send_times_out_when_writer_queue_remains_full() {
3059        let (writer_tx, _writer_rx) = mpsc::channel::<Frame>(1);
3060        writer_tx
3061            .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
3062            .expect("prefill writer queue");
3063        let started = Instant::now();
3064
3065        let result = send_frame(
3066            &writer_tx,
3067            Frame::build(FrameType::Pong, control_flags(), 0, 2, Vec::new()).unwrap(),
3068        )
3069        .await;
3070
3071        assert!(matches!(result, Err(SubcError::WriterBackpressureTimeout)));
3072        assert!(
3073            started.elapsed() < Duration::from_secs(2),
3074            "control send guard should be bounded"
3075        );
3076    }
3077
3078    const CORE_TOOLS: [&str; 8] = [
3079        "status", "read", "grep", "search", "outline", "inspect", "edit", "write",
3080    ];
3081
3082    fn is_bare_placeholder_schema(schema: &Value) -> bool {
3083        schema == &json!({ "type": "object" })
3084    }
3085
3086    #[test]
3087    fn build_manifest_serves_embedded_tool_schemas() {
3088        let manifest = build_manifest();
3089        let tools = match manifest.provides.first() {
3090            Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3091            _ => panic!("expected ToolProvider"),
3092        };
3093        let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3094        for name in CORE_TOOLS {
3095            let tool = by_name
3096                .get(name)
3097                .unwrap_or_else(|| panic!("missing tool {name}"));
3098            assert!(
3099                !is_bare_placeholder_schema(&tool.schema),
3100                "{name} must not use bare placeholder schema"
3101            );
3102            assert_eq!(
3103                tool.schema.get("type").and_then(|v| v.as_str()),
3104                Some("object"),
3105                "{name} schema must be an object"
3106            );
3107        }
3108
3109        let read = by_name["read"]
3110            .schema
3111            .get("properties")
3112            .and_then(|p| p.as_object());
3113        let read_props = read.expect("read schema properties");
3114        assert!(
3115            read_props.contains_key("filePath"),
3116            "read schema must expose filePath"
3117        );
3118
3119        let status = &by_name["status"].schema;
3120        assert_eq!(
3121            status.get("properties").and_then(|v| v.as_object()),
3122            Some(&serde_json::Map::new()),
3123            "status schema must have empty properties"
3124        );
3125        assert_eq!(
3126            status.get("additionalProperties").and_then(|v| v.as_bool()),
3127            Some(false),
3128            "status schema must forbid additionalProperties"
3129        );
3130    }
3131
3132    #[test]
3133    fn build_manifest_classifies_execution_mode_by_observable_effect() {
3134        let manifest = build_manifest();
3135        let tools = match manifest.provides.first() {
3136            Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3137            _ => panic!("expected ToolProvider"),
3138        };
3139        let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3140
3141        // Readers warm AFT's own index/cache/symbol artifacts (internal ctx
3142        // mutation), not the user's observable workspace, so they are Pure.
3143        for name in ["status", "read", "grep", "search", "outline", "inspect"] {
3144            assert_eq!(
3145                by_name[name].execution_mode,
3146                ExecutionMode::Pure,
3147                "{name} produces no observable side effect and must be Pure"
3148            );
3149        }
3150        // Only edit/write produce observable file writes -> Mutating.
3151        for name in ["edit", "write"] {
3152            assert_eq!(
3153                by_name[name].execution_mode,
3154                ExecutionMode::Mutating,
3155                "{name} writes files and must be Mutating"
3156            );
3157        }
3158    }
3159}
3160
3161#[derive(Debug)]
3162pub enum SubcError {
3163    Runtime(std::io::Error),
3164    ConnectionFile {
3165        path: PathBuf,
3166        source: subc_transport::ConnectionFileError,
3167    },
3168    NoEndpoint {
3169        path: PathBuf,
3170    },
3171    InvalidEndpoint {
3172        path: PathBuf,
3173        endpoint: String,
3174    },
3175    Connect {
3176        endpoint: String,
3177        source: std::io::Error,
3178    },
3179    Auth {
3180        endpoint: String,
3181        source: subc_transport::AuthError,
3182    },
3183    FrameIo(subc_transport::FrameIoError),
3184    FrameBuild(subc_protocol::FrameBuildError),
3185    WriterClosed,
3186    WriterBackpressureTimeout,
3187    WriterJoin(tokio::task::JoinError),
3188    Json(serde_json::Error),
3189    ClosedBeforeHelloAck,
3190    HelloRejected {
3191        body: Option<ErrorBody>,
3192    },
3193    UnexpectedFrame {
3194        ty: FrameType,
3195    },
3196}
3197
3198impl fmt::Display for SubcError {
3199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3200        match self {
3201            Self::Runtime(e) => write!(f, "failed to build subc tokio runtime: {e}"),
3202            Self::ConnectionFile { path, source } => {
3203                write!(f, "failed to read subc connection file {path:?}: {source}")
3204            }
3205            Self::NoEndpoint { path } => {
3206                write!(f, "subc connection file {path:?} has no endpoints")
3207            }
3208            Self::InvalidEndpoint { path, endpoint } => {
3209                write!(
3210                    f,
3211                    "subc connection file {path:?} has invalid endpoint {endpoint}"
3212                )
3213            }
3214            Self::Connect { endpoint, source } => {
3215                write!(f, "failed to connect to subc endpoint {endpoint}: {source}")
3216            }
3217            Self::Auth { endpoint, source } => {
3218                write!(
3219                    f,
3220                    "failed to authenticate to subc endpoint {endpoint}: {source}"
3221                )
3222            }
3223            Self::FrameIo(e) => write!(f, "subc frame I/O error: {e}"),
3224            Self::FrameBuild(e) => write!(f, "subc frame build error: {e}"),
3225            Self::WriterClosed => write!(f, "subc writer task closed"),
3226            Self::WriterBackpressureTimeout => write!(
3227                f,
3228                "subc writer task stayed backpressured while sending a control frame"
3229            ),
3230            Self::WriterJoin(e) => write!(f, "subc writer task join error: {e}"),
3231            Self::Json(e) => write!(f, "subc JSON error: {e}"),
3232            Self::ClosedBeforeHelloAck => {
3233                write!(f, "subc daemon closed the connection before HelloAck")
3234            }
3235            Self::HelloRejected { body } => match body {
3236                Some(b) => write!(f, "subc rejected ModuleHello: {} ({})", b.code, b.message),
3237                None => write!(f, "subc rejected ModuleHello (unparseable error body)"),
3238            },
3239            Self::UnexpectedFrame { ty } => {
3240                write!(f, "subc sent unexpected frame in place of HelloAck: {ty:?}")
3241            }
3242        }
3243    }
3244}
3245
3246impl std::error::Error for SubcError {}