1use std::collections::{HashMap, HashSet, VecDeque};
15use std::fmt;
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use serde::Deserialize;
22use serde_json::{json, Value};
23
24use crate::config::Config;
25use crate::config_resolve::ConfigTier;
26use crate::context::{App, AppContext, ProgressSender};
27use crate::executor::{Executor, Lane};
28use crate::log_ctx;
29use crate::path_identity::ProjectRootId;
30use crate::protocol::{ProgressKind, PushFrame, RawRequest, Response};
31use crate::runtime_drain;
32
33use subc_protocol::manifest::{
34 Bindings, Concurrency, ExecutionMode, IdentityBinding, IdentityScope, ModuleManifest,
35 ProviderRole, StorageBinding, StorageKind, StorageScope, Tool, TrustTier,
36};
37use subc_protocol::session::{ModuleControlRequest, ModuleControlResponse};
38use subc_protocol::{
39 ErrorBody, Flags, Frame, FrameType, ModuleHelloBody, Priority, PROTOCOL_VERSION,
40};
41use subc_transport::{authenticate_client, connection_file, read_frame, write_frame};
42use tokio::io::{AsyncRead, AsyncWrite};
43use tokio::net::TcpStream;
44use tokio::sync::{mpsc, oneshot, Notify};
45use tokio::task::JoinHandle;
46
47const AUTH_DEADLINE: Duration = Duration::from_secs(5);
51
52const HELLO_CORR: u64 = 1;
54
55const PUSH_BUFFER_MAX_PER_KEY: usize = 256;
58
59const CONTROL_SEND_TIMEOUT: Duration = Duration::from_millis(250);
63
64const COMPLETED_TASK_SUPPRESSION_MAX: usize = 4096;
67
68type RouteChannel = u32;
69type PushEnvelope = (ProjectRootId, PushFrame);
70type RetryBuffer = HashMap<RouteChannel, VecDeque<(ReplayKey, PushFrame)>>;
71
72#[derive(Clone)]
73struct PushSenders {
74 lossy_tx: mpsc::Sender<PushEnvelope>,
75 reliable_tx: mpsc::UnboundedSender<PushEnvelope>,
76}
77
78#[derive(Debug)]
79struct RootMeta {
80 maintenance_pending: bool,
81 last_touched: Instant,
82 diagnostics_on_edit: bool,
83}
84
85#[derive(Debug)]
86struct PendingBind {
87 bind_root_id: ProjectRootId,
88 inserted_new_actor: bool,
89 cancelled: bool,
90}
91
92struct RouteBindCompletion {
93 route_channel: u16,
94 identity: RouteIdentity,
95 bind_root_id: ProjectRootId,
96 inserted_new_actor: bool,
97 configure_response: Response,
98 drain_response: Option<Response>,
99 diagnostics_on_edit: bool,
100 ver: u8,
101 corr: u64,
102 flags: Flags,
103}
104
105#[derive(Debug, Clone)]
106struct RouteIdentity {
107 root: ProjectRootId,
108 project_root: PathBuf,
109 harness: String,
110 session: String,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Hash)]
114struct ReplayKey {
115 root: ProjectRootId,
116 harness: String,
117 session: String,
118}
119
120impl ReplayKey {
121 fn from_identity(identity: &RouteIdentity) -> Self {
122 Self {
123 root: identity.root.clone(),
124 harness: identity.harness.clone(),
125 session: identity.session.clone(),
126 }
127 }
128}
129
130#[derive(Debug, Default)]
131struct CompletedTaskIds {
132 order: VecDeque<String>,
133 set: HashSet<String>,
134}
135
136impl CompletedTaskIds {
137 fn remember(&mut self, task_id: &str) {
138 if self.set.contains(task_id) {
139 return;
140 }
141 if self.order.len() >= COMPLETED_TASK_SUPPRESSION_MAX {
142 if let Some(evicted) = self.order.pop_front() {
143 self.set.remove(&evicted);
144 }
145 }
146 let task_id = task_id.to_string();
147 self.order.push_back(task_id.clone());
148 self.set.insert(task_id);
149 }
150
151 fn contains(&self, task_id: &str) -> bool {
152 self.set.contains(task_id)
153 }
154}
155
156impl RootMeta {
157 fn new(now: Instant) -> Self {
158 Self {
159 maintenance_pending: false,
160 last_touched: now,
161 diagnostics_on_edit: false,
162 }
163 }
164
165 fn touch(&mut self) {
166 self.last_touched = Instant::now();
167 }
168}
169
170fn route_key(channel: u16) -> RouteChannel {
171 RouteChannel::from(channel)
172}
173
174fn remove_root_channel(
175 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
176 root: &ProjectRootId,
177 channel: RouteChannel,
178) {
179 let remove_root = if let Some(channels) = root_channels.get_mut(root) {
180 channels.remove(&channel);
181 channels.is_empty()
182 } else {
183 false
184 };
185 if remove_root {
186 root_channels.remove(root);
187 }
188}
189
190fn remove_route_channel(
191 routes: &mut HashMap<RouteChannel, RouteIdentity>,
192 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
193 channel: RouteChannel,
194) -> Option<RouteIdentity> {
195 let removed = routes.remove(&channel);
196 if let Some(identity) = &removed {
197 remove_root_channel(root_channels, &identity.root, channel);
198 }
199 removed
200}
201
202fn insert_route_channel(
203 routes: &mut HashMap<RouteChannel, RouteIdentity>,
204 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
205 channel: RouteChannel,
206 identity: RouteIdentity,
207) {
208 if let Some(previous) = routes.insert(channel, identity.clone()) {
209 remove_root_channel(root_channels, &previous.root, channel);
210 }
211 root_channels
212 .entry(identity.root.clone())
213 .or_default()
214 .insert(channel);
215}
216
217fn remember_session_identity(
218 session_identity: &mut HashMap<(ProjectRootId, String), String>,
219 identity: &RouteIdentity,
220) {
221 session_identity.insert(
226 (identity.root.clone(), identity.session.clone()),
227 identity.harness.clone(),
228 );
229}
230
231fn replay_key_for_session(
232 session_identity: &HashMap<(ProjectRootId, String), String>,
233 root: &ProjectRootId,
234 session: &str,
235) -> Option<ReplayKey> {
236 let harness = session_identity.get(&(root.clone(), session.to_string()))?;
237 Some(ReplayKey {
238 root: root.clone(),
239 harness: harness.clone(),
240 session: session.to_string(),
241 })
242}
243
244fn frame_session(frame: &PushFrame) -> Option<&str> {
245 match frame {
246 PushFrame::BashCompleted(completed) => Some(completed.session_id.as_str()),
247 PushFrame::BashLongRunning(long_running) => Some(long_running.session_id.as_str()),
248 PushFrame::BashPatternMatch(pattern_match) => Some(pattern_match.session_id.as_str()),
249 PushFrame::ConfigureWarnings(warnings) => warnings.session_id.as_deref(),
250 PushFrame::StatusChanged(status) => status.session_id.as_deref(),
251 PushFrame::Progress(_) => None,
252 }
253}
254
255fn frame_is_reliable(frame: &PushFrame) -> bool {
256 matches!(
257 frame,
258 PushFrame::BashCompleted(_)
259 | PushFrame::BashPatternMatch(_)
260 | PushFrame::ConfigureWarnings(_)
261 )
262}
263
264fn completed_task_id(frame: &PushFrame) -> Option<&str> {
265 match frame {
266 PushFrame::BashCompleted(completed) => Some(completed.task_id.as_str()),
267 _ => None,
268 }
269}
270
271fn long_running_task_id(frame: &PushFrame) -> Option<&str> {
272 match frame {
273 PushFrame::BashLongRunning(long_running) => Some(long_running.task_id.as_str()),
274 _ => None,
275 }
276}
277
278fn should_drop_lossy_push(completed_tasks: &CompletedTaskIds, frame: &PushFrame) -> bool {
279 long_running_task_id(frame).is_some_and(|task_id| completed_tasks.contains(task_id))
280}
281
282fn progress_sender_for_root(push_senders: PushSenders, root_id: ProjectRootId) -> ProgressSender {
283 Arc::new(Box::new(move |frame: PushFrame| {
284 if frame_is_reliable(&frame) {
289 let _ = push_senders.reliable_tx.send((root_id.clone(), frame));
290 } else {
291 let _ = push_senders.lossy_tx.try_send((root_id.clone(), frame));
292 }
293 }))
294}
295
296#[derive(Debug, Clone, PartialEq, Eq, Hash)]
297enum LossyProgressKind {
298 Stdout,
299 Stderr,
300}
301
302impl From<&ProgressKind> for LossyProgressKind {
303 fn from(kind: &ProgressKind) -> Self {
304 match kind {
305 ProgressKind::Stdout => Self::Stdout,
306 ProgressKind::Stderr => Self::Stderr,
307 }
308 }
309}
310
311#[derive(Debug, Clone, PartialEq, Eq, Hash)]
312enum LossyPushKey {
313 Progress {
314 request_id: String,
315 kind: LossyProgressKind,
316 },
317 StatusChanged,
318 BashLongRunning {
319 task_id: String,
320 },
321}
322
323fn lossy_push_key(frame: &PushFrame) -> Option<LossyPushKey> {
324 match frame {
325 PushFrame::Progress(progress) => Some(LossyPushKey::Progress {
326 request_id: progress.request_id.clone(),
327 kind: LossyProgressKind::from(&progress.kind),
328 }),
329 PushFrame::StatusChanged(_) => Some(LossyPushKey::StatusChanged),
330 PushFrame::BashLongRunning(long_running) => Some(LossyPushKey::BashLongRunning {
331 task_id: long_running.task_id.clone(),
332 }),
333 PushFrame::BashCompleted(_)
334 | PushFrame::BashPatternMatch(_)
335 | PushFrame::ConfigureWarnings(_) => None,
336 }
337}
338
339fn coalesce_push_batch(batch: Vec<(ProjectRootId, PushFrame)>) -> Vec<(ProjectRootId, PushFrame)> {
340 let mut slots: Vec<Option<(ProjectRootId, PushFrame)>> = Vec::with_capacity(batch.len());
341 let mut latest_lossy: HashMap<(ProjectRootId, LossyPushKey), usize> = HashMap::new();
342
343 for (root, frame) in batch {
344 if let Some(lossy_key) = lossy_push_key(&frame) {
345 let map_key = (root.clone(), lossy_key);
346 if let Some(previous_index) = latest_lossy.insert(map_key, slots.len()) {
347 slots[previous_index] = None;
348 }
349 }
350 slots.push(Some((root, frame)));
351 }
352
353 slots.into_iter().flatten().collect()
354}
355
356#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
357struct FanOutResult {
358 matched_channels: usize,
362 sent_frames: usize,
365}
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368enum PushSendOutcome {
369 Sent,
370 Backpressure,
371 PermanentFailure,
372}
373
374fn try_send_push_body(
375 writer_tx: &mpsc::Sender<Frame>,
376 channel: RouteChannel,
377 body: &[u8],
378) -> PushSendOutcome {
379 let Ok(route_channel) = u16::try_from(channel) else {
380 log::warn!("subc attach: invalid route channel {channel} for Push fan-out");
381 return PushSendOutcome::PermanentFailure;
382 };
383 let push_frame = match Frame::build_with_version(
384 PROTOCOL_VERSION,
385 FrameType::Push,
386 control_flags(),
387 route_channel,
388 0,
389 body.to_vec(),
390 ) {
391 Ok(frame) => frame,
392 Err(error) => {
393 log::warn!("subc attach: failed to build Push frame: {error}");
394 return PushSendOutcome::PermanentFailure;
395 }
396 };
397 match writer_tx.try_send(push_frame) {
398 Ok(()) => PushSendOutcome::Sent,
399 Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
400 Err(mpsc::error::TrySendError::Closed(_)) => {
401 log::warn!("subc attach: writer closed while sending Push frame");
402 PushSendOutcome::PermanentFailure
403 }
404 }
405}
406
407fn try_send_push_frame(
408 writer_tx: &mpsc::Sender<Frame>,
409 channel: RouteChannel,
410 frame: &PushFrame,
411) -> PushSendOutcome {
412 let body = match serde_json::to_vec(frame) {
413 Ok(body) => body,
414 Err(error) => {
415 log::warn!("subc attach: failed to serialize PushFrame: {error}");
416 return PushSendOutcome::PermanentFailure;
417 }
418 };
419 try_send_push_body(writer_tx, channel, &body)
420}
421
422fn bounded_push_back<T>(queue: &mut VecDeque<T>, item: T) {
423 if queue.len() >= PUSH_BUFFER_MAX_PER_KEY {
424 queue.pop_front();
425 }
426 queue.push_back(item);
427}
428
429fn buffer_push_frame(
430 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
431 key: ReplayKey,
432 frame: PushFrame,
433) {
434 bounded_push_back(push_buffer.entry(key).or_default(), frame);
435}
436
437fn buffer_retry_frame(
438 retry_buffer: &mut RetryBuffer,
439 channel: RouteChannel,
440 key: ReplayKey,
441 frame: PushFrame,
442) {
443 bounded_push_back(retry_buffer.entry(channel).or_default(), (key, frame));
444}
445
446fn migrate_retry_buffer_to_push_buffer(
447 retry_buffer: &mut RetryBuffer,
448 channel: RouteChannel,
449 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
450) -> usize {
451 let Some(frames) = retry_buffer.remove(&channel) else {
452 return 0;
453 };
454 let migrated = frames.len();
455 for (key, frame) in frames {
456 buffer_push_frame(push_buffer, key, frame);
457 }
458 migrated
459}
460
461fn replay_buffered_push_frames(
462 writer_tx: &mpsc::Sender<Frame>,
463 channel: RouteChannel,
464 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
465 key: &ReplayKey,
466) -> usize {
467 let mut sent = 0;
468 let remove_empty;
469
470 {
471 let Some(queue) = push_buffer.get_mut(key) else {
472 return 0;
473 };
474
475 while let Some(frame) = queue.pop_front() {
476 match try_send_push_frame(writer_tx, channel, &frame) {
477 PushSendOutcome::Sent => sent += 1,
478 PushSendOutcome::Backpressure => {
479 queue.push_front(frame);
480 break;
481 }
482 PushSendOutcome::PermanentFailure => {
483 log::warn!(
484 "subc attach: dropping buffered reliable Push for root {} harness {} session {} after permanent send failure",
485 key.root.as_path().display(),
486 key.harness,
487 key.session
488 );
489 }
490 }
491 }
492
493 remove_empty = queue.is_empty();
494 }
495
496 if remove_empty {
497 push_buffer.remove(key);
498 }
499
500 sent
501}
502
503fn drain_retry_buffer_for_channel(
504 writer_tx: &mpsc::Sender<Frame>,
505 channel: RouteChannel,
506 retry_buffer: &mut RetryBuffer,
507) -> usize {
508 let mut sent = 0;
509 let remove_empty;
510
511 {
512 let Some(queue) = retry_buffer.get_mut(&channel) else {
513 return 0;
514 };
515
516 while let Some((key, frame)) = queue.pop_front() {
517 match try_send_push_frame(writer_tx, channel, &frame) {
518 PushSendOutcome::Sent => sent += 1,
519 PushSendOutcome::Backpressure => {
520 queue.push_front((key, frame));
521 break;
522 }
523 PushSendOutcome::PermanentFailure => {
524 log::warn!(
525 "subc attach: dropping retry-buffered reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
526 key.root.as_path().display(),
527 key.harness,
528 key.session
529 );
530 }
531 }
532 }
533
534 remove_empty = queue.is_empty();
535 }
536
537 if remove_empty {
538 retry_buffer.remove(&channel);
539 }
540
541 sent
542}
543
544fn drain_retry_buffers_for_bound_routes(
545 writer_tx: &mpsc::Sender<Frame>,
546 routes: &HashMap<RouteChannel, RouteIdentity>,
547 retry_buffer: &mut RetryBuffer,
548) -> usize {
549 let channels: Vec<RouteChannel> = routes.keys().copied().collect();
550 channels
551 .into_iter()
552 .map(|channel| drain_retry_buffer_for_channel(writer_tx, channel, retry_buffer))
553 .sum()
554}
555
556fn matching_route_channels(
557 routes: &HashMap<RouteChannel, RouteIdentity>,
558 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
559 root: &ProjectRootId,
560 frame: &PushFrame,
561) -> Vec<RouteChannel> {
562 let Some(channels) = root_channels.get(root) else {
563 return Vec::new();
564 };
565
566 let session = frame_session(frame);
567 channels
568 .iter()
569 .copied()
570 .filter(|channel| match session {
571 Some(session) => routes
572 .get(channel)
573 .is_some_and(|identity| identity.session == session),
574 None => true,
575 })
576 .collect()
577}
578
579fn buffer_detached_reliable_push_frame(
580 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
581 session_identity: &HashMap<(ProjectRootId, String), String>,
582 root: &ProjectRootId,
583 frame: &PushFrame,
584) {
585 let Some(session) = frame_session(frame) else {
586 log::warn!(
587 "subc attach: dropping reliable project-scoped Push for root {} because no route is bound",
588 root.as_path().display()
589 );
590 return;
591 };
592
593 if let Some(key) = replay_key_for_session(session_identity, root, session) {
594 buffer_push_frame(push_buffer, key, frame.clone());
595 } else {
596 log::warn!(
597 "subc attach: dropping reliable Push for root {} session {} because no retained harness identity is known",
598 root.as_path().display(),
599 session
600 );
601 }
602}
603
604fn fan_out_lossy_push_frame(
605 writer_tx: &mpsc::Sender<Frame>,
606 routes: &HashMap<RouteChannel, RouteIdentity>,
607 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
608 root: &ProjectRootId,
609 frame: &PushFrame,
610) -> FanOutResult {
611 let matching_channels = matching_route_channels(routes, root_channels, root, frame);
612 let matched_channels = matching_channels.len();
613 if matched_channels == 0 {
614 return FanOutResult::default();
615 }
616
617 let body = match serde_json::to_vec(frame) {
618 Ok(body) => body,
619 Err(error) => {
620 log::warn!("subc attach: failed to serialize PushFrame for fan-out: {error}");
621 return FanOutResult {
622 matched_channels,
623 sent_frames: 0,
624 };
625 }
626 };
627
628 let sent_frames = matching_channels
629 .into_iter()
630 .filter(|&channel| {
631 matches!(
632 try_send_push_body(writer_tx, channel, &body),
633 PushSendOutcome::Sent
634 )
635 })
636 .count();
637
638 FanOutResult {
639 matched_channels,
640 sent_frames,
641 }
642}
643
644fn fan_out_reliable_push_frame(
645 writer_tx: &mpsc::Sender<Frame>,
646 routes: &HashMap<RouteChannel, RouteIdentity>,
647 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
648 session_identity: &HashMap<(ProjectRootId, String), String>,
649 retry_buffer: &mut RetryBuffer,
650 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
651 root: &ProjectRootId,
652 frame: &PushFrame,
653) -> FanOutResult {
654 let matching_channels = matching_route_channels(routes, root_channels, root, frame);
655 let matched_channels = matching_channels.len();
656 if matched_channels == 0 {
657 buffer_detached_reliable_push_frame(push_buffer, session_identity, root, frame);
658 return FanOutResult::default();
659 }
660
661 let mut sent_frames = 0;
662 for channel in matching_channels {
663 let Some(identity) = routes.get(&channel) else {
664 log::warn!(
665 "subc attach: dropping reliable Push for stale route channel {channel} with no route identity"
666 );
667 continue;
668 };
669 let key = ReplayKey::from_identity(identity);
670
671 if retry_buffer
672 .get(&channel)
673 .is_some_and(|queue| !queue.is_empty())
674 {
675 buffer_retry_frame(retry_buffer, channel, key, frame.clone());
676 continue;
677 }
678
679 match try_send_push_frame(writer_tx, channel, frame) {
680 PushSendOutcome::Sent => sent_frames += 1,
681 PushSendOutcome::Backpressure => {
682 buffer_retry_frame(retry_buffer, channel, key, frame.clone());
683 }
684 PushSendOutcome::PermanentFailure => {
685 log::warn!(
686 "subc attach: dropping reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
687 key.root.as_path().display(),
688 key.harness,
689 key.session
690 );
691 }
692 }
693 }
694
695 FanOutResult {
696 matched_channels,
697 sent_frames,
698 }
699}
700
701fn process_reliable_push_frame(
702 writer_tx: &mpsc::Sender<Frame>,
703 routes: &HashMap<RouteChannel, RouteIdentity>,
704 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
705 session_identity: &HashMap<(ProjectRootId, String), String>,
706 retry_buffer: &mut RetryBuffer,
707 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
708 completed_tasks: &mut CompletedTaskIds,
709 root: ProjectRootId,
710 frame: PushFrame,
711) {
712 if let Some(task_id) = completed_task_id(&frame) {
713 completed_tasks.remember(task_id);
714 }
715 let _ = fan_out_reliable_push_frame(
716 writer_tx,
717 routes,
718 root_channels,
719 session_identity,
720 retry_buffer,
721 push_buffer,
722 &root,
723 &frame,
724 );
725}
726
727fn process_lossy_push_frame(
728 writer_tx: &mpsc::Sender<Frame>,
729 routes: &HashMap<RouteChannel, RouteIdentity>,
730 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
731 completed_tasks: &CompletedTaskIds,
732 root: ProjectRootId,
733 frame: PushFrame,
734) {
735 if should_drop_lossy_push(completed_tasks, &frame) {
736 if let Some(task_id) = long_running_task_id(&frame) {
737 log::debug!(
738 "subc attach: dropping stale BashLongRunning Push for completed task {task_id}"
739 );
740 }
741 return;
742 }
743
744 let _ = fan_out_lossy_push_frame(writer_tx, routes, root_channels, &root, &frame);
745}
746
747pub type DispatchFn = fn(RawRequest, &AppContext) -> Response;
750
751pub fn run_subc_mode(
756 connection_file_path: &Path,
757 ctx: Arc<AppContext>,
758 executor: Arc<Executor>,
759 dispatch: DispatchFn,
760 user_config_path: Option<PathBuf>,
761) -> Result<(), SubcError> {
762 run_subc_mode_inner(
766 connection_file_path,
767 ctx,
768 executor,
769 dispatch,
770 user_config_path,
771 false,
772 )
773}
774
775fn run_subc_mode_inner(
776 connection_file_path: &Path,
777 ctx: Arc<AppContext>,
778 executor: Arc<Executor>,
779 dispatch: DispatchFn,
780 user_config_path: Option<PathBuf>,
781 allow_native_passthrough: bool,
782) -> Result<(), SubcError> {
783 let runtime = tokio::runtime::Builder::new_current_thread()
784 .enable_all()
785 .build()
786 .map_err(SubcError::Runtime)?;
787
788 let executor_for_loop = Arc::clone(&executor);
789 let loop_result = runtime.block_on(async move {
790 let shared_app = ctx.app();
791 drop(ctx);
792 let stream = connect_and_authenticate(connection_file_path).await?;
793 log::info!(
794 "subc attach: authenticated to daemon via {}",
795 connection_file_path.display()
796 );
797 let (read_half, write_half) = tokio::io::split(stream);
798 run_module_loop(
799 read_half,
800 write_half,
801 shared_app,
802 executor_for_loop,
803 dispatch,
804 user_config_path,
805 allow_native_passthrough,
806 )
807 .await
808 });
809
810 for actor_ctx in executor.actor_contexts() {
811 actor_ctx.lsp().shutdown_all();
812 actor_ctx.bash_background().detach();
813 }
814
815 loop_result
816}
817
818#[doc(hidden)]
823pub fn run_subc_mode_for_test(
824 connection_file_path: &Path,
825 ctx: Arc<AppContext>,
826 executor: Arc<Executor>,
827 dispatch: DispatchFn,
828 user_config_path: Option<PathBuf>,
829) -> Result<(), SubcError> {
830 run_subc_mode_inner(
831 connection_file_path,
832 ctx,
833 executor,
834 dispatch,
835 user_config_path,
836 true,
837 )
838}
839
840async fn connect_and_authenticate(connection_file_path: &Path) -> Result<TcpStream, SubcError> {
843 let conn = connection_file::read(connection_file_path).map_err(|source| {
844 SubcError::ConnectionFile {
845 path: connection_file_path.to_path_buf(),
846 source,
847 }
848 })?;
849
850 let endpoint = conn
851 .endpoints
852 .first()
853 .ok_or_else(|| SubcError::NoEndpoint {
854 path: connection_file_path.to_path_buf(),
855 })?;
856 let endpoint_label = format!("{}:{}", endpoint.host, endpoint.port);
857 let ip = endpoint
858 .host
859 .parse::<IpAddr>()
860 .map_err(|_| SubcError::InvalidEndpoint {
861 path: connection_file_path.to_path_buf(),
862 endpoint: endpoint_label.clone(),
863 })?;
864 let addr = SocketAddr::new(ip, endpoint.port);
865
866 let mut stream = TcpStream::connect(addr)
867 .await
868 .map_err(|source| SubcError::Connect {
869 endpoint: endpoint_label.clone(),
870 source,
871 })?;
872
873 authenticate_client(&mut stream, &conn, AUTH_DEADLINE)
874 .await
875 .map_err(|source| SubcError::Auth {
876 endpoint: endpoint_label,
877 source,
878 })?;
879
880 Ok(stream)
881}
882
883async fn run_module_loop<R, W>(
887 mut read: R,
888 mut write: W,
889 shared_app: Arc<App>,
890 executor: Arc<Executor>,
891 dispatch: DispatchFn,
892 user_config_path: Option<PathBuf>,
893 allow_native_passthrough: bool,
894) -> Result<(), SubcError>
895where
896 R: AsyncRead + Unpin + Send + 'static,
897 W: AsyncWrite + Unpin + Send + 'static,
898{
899 let hello = ModuleHelloBody {
903 manifest: build_manifest(),
904 protocol_ver: PROTOCOL_VERSION,
905 control_ops: None,
906 launch_nonce: std::env::var("SUBC_LAUNCH_NONCE").ok(),
907 };
908 let hello_frame = Frame::build(
909 FrameType::Hello,
910 control_flags(),
911 0,
912 HELLO_CORR,
913 serde_json::to_vec(&hello).map_err(SubcError::Json)?,
914 )
915 .map_err(SubcError::FrameBuild)?;
916 write_frame(&mut write, &hello_frame)
917 .await
918 .map_err(SubcError::FrameIo)?;
919
920 match read_frame(&mut read).await.map_err(SubcError::FrameIo)? {
922 None => return Err(SubcError::ClosedBeforeHelloAck),
923 Some(frame) => match frame.header.ty {
924 FrameType::HelloAck => {
925 log::info!("subc attach: registered (HelloAck received)");
926 }
927 FrameType::Error => {
928 let body = serde_json::from_slice::<ErrorBody>(&frame.body).ok();
929 return Err(SubcError::HelloRejected { body });
930 }
931 other => return Err(SubcError::UnexpectedFrame { ty: other }),
932 },
933 }
934
935 let (writer_tx, writer_rx) = mpsc::channel::<Frame>(256);
936 let writer_task = spawn_writer_task(write, writer_rx);
937 let (reader_tx, mut reader_rx) = mpsc::channel::<Result<Frame, SubcError>>(256);
944 let reader_task = spawn_reader_task(read, reader_tx);
945 let shutdown = Arc::new(Notify::new());
946 let mut drain_interval = tokio::time::interval(Duration::from_millis(250));
947 let (maintenance_tx, mut maintenance_rx) = mpsc::channel::<(ProjectRootId, Response)>(256);
948 let (control_completion_tx, mut control_completion_rx) =
949 mpsc::channel::<RouteBindCompletion>(256);
950 let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1024);
951 let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
952 let push_senders = PushSenders {
953 lossy_tx,
954 reliable_tx,
955 };
956 let mut routes: HashMap<RouteChannel, RouteIdentity> = HashMap::new();
957 let mut root_channels: HashMap<ProjectRootId, HashSet<RouteChannel>> = HashMap::new();
958 let mut session_identity: HashMap<(ProjectRootId, String), String> = HashMap::new();
959 let mut push_buffer: HashMap<ReplayKey, VecDeque<PushFrame>> = HashMap::new();
960 let mut retry_buffer: RetryBuffer = HashMap::new();
961 let mut completed_tasks = CompletedTaskIds::default();
962 let mut live_roots: HashMap<ProjectRootId, RootMeta> = HashMap::new();
963 let mut pending_binds: HashMap<RouteChannel, PendingBind> = HashMap::new();
964
965 let loop_result: Result<(), SubcError> = loop {
966 tokio::select! {
967 _ = shutdown.notified() => {
968 log::warn!("subc attach: fatal executor response requested teardown");
969 break Ok(());
970 }
971 maybe_frame = reader_rx.recv() => {
972 let frame = match maybe_frame {
973 None => {
974 log::info!("subc attach: daemon closed connection");
975 break Ok(());
976 }
977 Some(Err(error)) => break Err(error),
978 Some(Ok(frame)) => frame,
979 };
980
981 match frame.header.ty {
982 FrameType::Ping if frame.header.channel == 0 => {
983 let pong = match Frame::build_with_version(
984 frame.header.ver,
985 FrameType::Pong,
986 frame.header.flags,
987 0,
988 frame.header.corr,
989 Vec::new(),
990 ) {
991 Ok(pong) => pong,
992 Err(error) => break Err(SubcError::FrameBuild(error)),
993 };
994 if let Err(error) = send_frame(&writer_tx, pong).await {
995 break Err(error);
996 }
997 }
998 FrameType::Goodbye if frame.header.channel == 0 => {
999 log::info!("subc attach: received channel-0 Goodbye");
1000 break Ok(());
1001 }
1002 FrameType::Goodbye => {
1003 let channel = route_key(frame.header.channel);
1004 if let Some(pending) = pending_binds.get_mut(&channel) {
1005 pending.cancelled = true;
1006 log::debug!(
1007 "subc attach: cancelled pending RouteBind for route {} on Goodbye",
1008 frame.header.channel
1009 );
1010 }
1011 let migrated = migrate_retry_buffer_to_push_buffer(
1012 &mut retry_buffer,
1013 channel,
1014 &mut push_buffer,
1015 );
1016 if let Some(identity) = remove_route_channel(&mut routes, &mut root_channels, channel) {
1017 if migrated > 0 {
1018 log::debug!(
1019 "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from route {} into detach replay",
1020 frame.header.channel
1021 );
1022 }
1023 if let Some(meta) = live_roots.get_mut(&identity.root) {
1024 let idle_for = meta.last_touched.elapsed();
1025 meta.touch();
1026 log::debug!(
1027 "subc attach: route {} torn down for root {} harness {} session {} (last touched {:?} ago)",
1028 frame.header.channel,
1029 identity.root.as_path().display(),
1030 identity.harness,
1031 identity.session,
1032 idle_for
1033 );
1034 } else {
1035 log::debug!(
1036 "subc attach: route {} torn down for root {} harness {} session {}",
1037 frame.header.channel,
1038 identity.root.as_path().display(),
1039 identity.harness,
1040 identity.session
1041 );
1042 }
1043 } else {
1044 if migrated > 0 {
1045 log::debug!(
1046 "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from unbound route {} into detach replay",
1047 frame.header.channel
1048 );
1049 }
1050 log::debug!("subc attach: unbound route {} torn down", frame.header.channel);
1051 }
1052 }
1053 FrameType::Request if frame.header.channel == 0 => {
1054 if let Err(error) = handle_control_request(
1055 &writer_tx,
1056 &frame,
1057 &shared_app,
1058 &executor,
1059 &mut live_roots,
1060 &mut pending_binds,
1061 &control_completion_tx,
1062 &push_senders,
1063 dispatch,
1064 user_config_path.as_deref(),
1065 )
1066 .await
1067 {
1068 break Err(error);
1069 }
1070 }
1071 FrameType::Request => {
1072 if let Err(error) = handle_tool_call(
1073 &writer_tx,
1074 &frame,
1075 &routes,
1076 &pending_binds,
1077 &mut live_roots,
1078 &executor,
1079 &shutdown,
1080 dispatch,
1081 allow_native_passthrough,
1082 )
1083 .await
1084 {
1085 break Err(error);
1086 }
1087 }
1088 _ => {}
1092 }
1093 }
1094 Some((root_id, frame)) = reliable_rx.recv() => {
1095 let mut batch = vec![(root_id, frame)];
1099 while let Ok(item) = reliable_rx.try_recv() {
1100 batch.push(item);
1101 }
1102
1103 for (root, frame) in batch {
1104 process_reliable_push_frame(
1105 &writer_tx,
1106 &routes,
1107 &root_channels,
1108 &session_identity,
1109 &mut retry_buffer,
1110 &mut push_buffer,
1111 &mut completed_tasks,
1112 root,
1113 frame,
1114 );
1115 }
1116 }
1117 Some((root_id, frame)) = lossy_rx.recv() => {
1118 while let Ok((reliable_root, reliable_frame)) = reliable_rx.try_recv() {
1122 process_reliable_push_frame(
1123 &writer_tx,
1124 &routes,
1125 &root_channels,
1126 &session_identity,
1127 &mut retry_buffer,
1128 &mut push_buffer,
1129 &mut completed_tasks,
1130 reliable_root,
1131 reliable_frame,
1132 );
1133 }
1134
1135 let mut batch = vec![(root_id, frame)];
1139 while let Ok(item) = lossy_rx.try_recv() {
1140 batch.push(item);
1141 }
1142
1143 for (root, frame) in coalesce_push_batch(batch) {
1144 process_lossy_push_frame(
1145 &writer_tx,
1146 &routes,
1147 &root_channels,
1148 &completed_tasks,
1149 root,
1150 frame,
1151 );
1152 }
1153 }
1154 Some(completion) = control_completion_rx.recv() => {
1155 if let Err(error) = handle_route_bind_completion(
1156 &writer_tx,
1157 completion,
1158 &mut routes,
1159 &mut root_channels,
1160 &mut session_identity,
1161 &mut push_buffer,
1162 &mut live_roots,
1163 &mut pending_binds,
1164 &executor,
1165 &shutdown,
1166 )
1167 .await
1168 {
1169 break Err(error);
1170 }
1171 }
1172 Some((root_id, response)) = maintenance_rx.recv() => {
1173 if let Some(meta) = live_roots.get_mut(&root_id) {
1174 meta.maintenance_pending = false;
1175 }
1176 if response_is_internal_error(&response) {
1177 signal_fatal_teardown(&writer_tx, None, PROTOCOL_VERSION, 0, &shutdown).await;
1178 }
1179 }
1180 _ = drain_interval.tick() => {
1181 let retried = drain_retry_buffers_for_bound_routes(
1182 &writer_tx,
1183 &routes,
1184 &mut retry_buffer,
1185 );
1186 if retried > 0 {
1187 log::debug!(
1188 "subc attach: retried {retried} reliable Push frame(s) after writer backpressure"
1189 );
1190 }
1191
1192 let due_roots: Vec<ProjectRootId> = live_roots
1193 .iter_mut()
1194 .filter_map(|(root_id, meta)| {
1195 if meta.maintenance_pending {
1196 None
1197 } else {
1198 meta.maintenance_pending = true;
1199 Some(root_id.clone())
1200 }
1201 })
1202 .collect();
1203 for root_id in due_roots {
1204 submit_maintenance_drain(&executor, root_id, &maintenance_tx);
1205 }
1206 }
1207 }
1208 };
1209
1210 reader_task.abort();
1213 drop(writer_tx);
1214 let writer_result = finish_writer_task(writer_task).await;
1215 loop_result.and(writer_result)
1216}
1217
1218fn spawn_writer_task<W>(
1219 mut write: W,
1220 mut rx: mpsc::Receiver<Frame>,
1221) -> JoinHandle<Result<(), subc_transport::FrameIoError>>
1222where
1223 W: AsyncWrite + Unpin + Send + 'static,
1224{
1225 tokio::spawn(async move {
1226 while let Some(frame) = rx.recv().await {
1227 write_frame(&mut write, &frame).await?;
1228 }
1229 Ok(())
1230 })
1231}
1232
1233fn spawn_reader_task<R>(mut read: R, tx: mpsc::Sender<Result<Frame, SubcError>>) -> JoinHandle<()>
1240where
1241 R: AsyncRead + Unpin + Send + 'static,
1242{
1243 tokio::spawn(async move {
1244 loop {
1245 match read_frame(&mut read).await {
1246 Ok(Some(frame)) => {
1247 if tx.send(Ok(frame)).await.is_err() {
1248 return;
1249 }
1250 }
1251 Ok(None) => {
1252 return;
1254 }
1255 Err(error) => {
1256 let _ = tx.send(Err(SubcError::FrameIo(error))).await;
1257 return;
1258 }
1259 }
1260 }
1261 })
1262}
1263
1264async fn finish_writer_task(
1265 mut writer_task: JoinHandle<Result<(), subc_transport::FrameIoError>>,
1266) -> Result<(), SubcError> {
1267 match tokio::time::timeout(Duration::from_millis(100), &mut writer_task).await {
1268 Ok(Ok(Ok(()))) => Ok(()),
1269 Ok(Ok(Err(error))) => Err(SubcError::FrameIo(error)),
1270 Ok(Err(error)) => Err(SubcError::WriterJoin(error)),
1271 Err(_) => {
1272 writer_task.abort();
1273 Ok(())
1274 }
1275 }
1276}
1277
1278async fn send_frame(tx: &mpsc::Sender<Frame>, frame: Frame) -> Result<(), SubcError> {
1279 match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(frame)).await {
1280 Ok(Ok(())) => Ok(()),
1281 Ok(Err(_)) => Err(SubcError::WriterClosed),
1282 Err(_) => Err(SubcError::WriterBackpressureTimeout),
1283 }
1284}
1285
1286fn rollback_pending_bind_actor(
1287 executor: &Arc<Executor>,
1288 live_roots: &HashMap<ProjectRootId, RootMeta>,
1289 root_id: &ProjectRootId,
1290 inserted_new_actor: bool,
1291) {
1292 if inserted_new_actor && !live_roots.contains_key(root_id) {
1293 executor.remove_actor(root_id);
1294 }
1295}
1296
1297async fn handle_route_bind_completion(
1298 tx: &mpsc::Sender<Frame>,
1299 completion: RouteBindCompletion,
1300 routes: &mut HashMap<RouteChannel, RouteIdentity>,
1301 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
1302 session_identity: &mut HashMap<(ProjectRootId, String), String>,
1303 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1304 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1305 pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1306 executor: &Arc<Executor>,
1307 shutdown: &Arc<Notify>,
1308) -> Result<(), SubcError> {
1309 let route_id = route_key(completion.route_channel);
1310 let Some(pending) = pending_binds.remove(&route_id) else {
1311 log::warn!(
1312 "subc attach: dropping RouteBind completion for non-pending route {}",
1313 completion.route_channel
1314 );
1315 rollback_pending_bind_actor(
1316 executor,
1317 live_roots,
1318 &completion.bind_root_id,
1319 completion.inserted_new_actor,
1320 );
1321 return Ok(());
1322 };
1323
1324 if pending.bind_root_id != completion.bind_root_id {
1325 log::warn!(
1326 "subc attach: pending RouteBind root mismatch for route {} (pending {} completion {})",
1327 completion.route_channel,
1328 pending.bind_root_id.as_path().display(),
1329 completion.bind_root_id.as_path().display()
1330 );
1331 }
1332
1333 let inserted_new_actor = pending.inserted_new_actor || completion.inserted_new_actor;
1334 if pending.cancelled {
1335 rollback_pending_bind_actor(
1336 executor,
1337 live_roots,
1338 &completion.bind_root_id,
1339 inserted_new_actor,
1340 );
1341 log::debug!(
1342 "subc attach: discarded completed RouteBind for cancelled route {} root {}",
1343 completion.route_channel,
1344 completion.bind_root_id.as_path().display()
1345 );
1346 return Ok(());
1347 }
1348
1349 let failure = if !completion.configure_response.success {
1350 Some((
1351 &completion.configure_response,
1352 "configure failed during route bind",
1353 ))
1354 } else if let Some(drain_response) = completion.drain_response.as_ref() {
1355 if drain_response.success {
1356 None
1357 } else {
1358 Some((
1359 drain_response,
1360 "build-completion drain failed during route bind",
1361 ))
1362 }
1363 } else {
1364 None
1365 };
1366
1367 if let Some((response, fallback)) = failure {
1368 rollback_pending_bind_actor(
1369 executor,
1370 live_roots,
1371 &completion.bind_root_id,
1372 inserted_new_actor,
1373 );
1374 let message = response_message(response, fallback);
1375 let fatal = response_is_internal_error(response);
1376 send_route_bind_error_parts(
1377 tx,
1378 completion.ver,
1379 completion.corr,
1380 completion.flags,
1381 "config_divergence",
1382 &message,
1383 )
1384 .await?;
1385 if fatal {
1386 signal_fatal_teardown(
1387 tx,
1388 Some(completion.route_channel),
1389 completion.ver,
1390 completion.corr,
1391 shutdown,
1392 )
1393 .await;
1394 }
1395 return Ok(());
1396 }
1397
1398 remember_session_identity(session_identity, &completion.identity);
1399 let replay_key = ReplayKey::from_identity(&completion.identity);
1400 insert_route_channel(routes, root_channels, route_id, completion.identity);
1401 live_roots
1402 .entry(completion.bind_root_id.clone())
1403 .and_modify(|meta| {
1404 meta.touch();
1405 meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1406 })
1407 .or_insert_with(|| RootMeta::new(Instant::now()));
1408 if let Some(meta) = live_roots.get_mut(&completion.bind_root_id) {
1409 meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1410 }
1411
1412 let ack =
1413 serde_json::to_vec(&ModuleControlResponse::RouteBindAck {}).map_err(SubcError::Json)?;
1414 let response = Frame::build_with_version(
1415 completion.ver,
1416 FrameType::Response,
1417 control_flags(),
1418 0,
1419 completion.corr,
1420 ack,
1421 )
1422 .map_err(SubcError::FrameBuild)?;
1423 send_frame(tx, response).await?;
1424 let replayed = replay_buffered_push_frames(tx, route_id, push_buffer, &replay_key);
1425 if replayed > 0 {
1426 log::debug!(
1427 "subc attach: replayed {} buffered Push frame(s) to route {} root {} harness {} session {}",
1428 replayed,
1429 completion.route_channel,
1430 replay_key.root.as_path().display(),
1431 replay_key.harness,
1432 replay_key.session
1433 );
1434 }
1435 log::info!(
1436 "subc attach: route {} bound to root {}",
1437 completion.route_channel,
1438 completion.bind_root_id.as_path().display()
1439 );
1440 Ok(())
1441}
1442
1443async fn handle_control_request(
1448 tx: &mpsc::Sender<Frame>,
1449 frame: &Frame,
1450 shared_app: &Arc<App>,
1451 executor: &Arc<Executor>,
1452 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1453 pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1454 control_completion_tx: &mpsc::Sender<RouteBindCompletion>,
1455 push_senders: &PushSenders,
1456 dispatch: DispatchFn,
1457 user_config_path: Option<&Path>,
1458) -> Result<(), SubcError> {
1459 let request =
1460 serde_json::from_slice::<ModuleControlRequest>(&frame.body).map_err(SubcError::Json)?;
1461 match request {
1462 ModuleControlRequest::RouteBind {
1463 route_channel,
1464 target: _,
1465 identity,
1466 ..
1471 } => {
1472 let route_id = route_key(route_channel);
1473 if pending_binds.contains_key(&route_id) {
1474 return send_route_bind_error(
1475 tx,
1476 frame,
1477 "config_divergence",
1478 "route bind is already pending for channel",
1479 )
1480 .await;
1481 }
1482
1483 let bind_root_id = match ProjectRootId::from_path(&identity.project_root) {
1484 Ok(root_id) => root_id,
1485 Err(error) => {
1486 return send_route_bind_error(
1487 tx,
1488 frame,
1489 "config_divergence",
1490 &format!("invalid route project root: {error}"),
1491 )
1492 .await;
1493 }
1494 };
1495
1496 let request_id = format!("subc-bind-{route_channel}");
1499 let bind_project_root = identity.project_root.clone();
1500 let bind_harness = identity.harness.clone();
1501 let bind_session = identity.session.clone();
1502
1503 let local_tiers = crate::subc_config::read_local_cortexkit_config_tiers(
1515 user_config_path,
1516 Path::new(&bind_project_root),
1517 );
1518 let config_tiers: Vec<Value> = local_tiers
1519 .iter()
1520 .map(|t| json!({ "tier": t.tier, "source": t.source, "doc": t.doc }))
1521 .collect();
1522 let diagnostics_on_edit = diagnostics_on_edit_from_tiers(&local_tiers);
1523 let configure_json = json!({
1524 "id": request_id,
1525 "command": "configure",
1526 "project_root": bind_project_root,
1527 "harness": bind_harness,
1528 "session_id": bind_session.clone(),
1529 "config": config_tiers,
1530 });
1531 let configure_req = match serde_json::from_value::<RawRequest>(configure_json) {
1532 Ok(req) => req,
1533 Err(error) => {
1534 return send_route_bind_error(
1535 tx,
1536 frame,
1537 "config_divergence",
1538 &format!("failed to build configure request: {error}"),
1539 )
1540 .await;
1541 }
1542 };
1543
1544 let route_identity = RouteIdentity {
1545 root: bind_root_id.clone(),
1546 project_root: PathBuf::from(&bind_project_root),
1547 harness: bind_harness.clone(),
1548 session: bind_session.clone(),
1549 };
1550 let configure_session = route_identity.session.clone();
1551 let root_was_live = live_roots.contains_key(&bind_root_id);
1552 let inserted_new_actor = if root_was_live {
1553 log::debug!(
1554 "subc attach: reusing actor for route {} root {}",
1555 route_channel,
1556 bind_root_id.as_path().display()
1557 );
1558 false
1559 } else {
1560 let actor_ctx = Arc::new(AppContext::from_app(
1561 Arc::clone(shared_app),
1562 Config::default(),
1563 ));
1564 install_bash_compressor(&actor_ctx);
1565 actor_ctx.set_progress_sender(Some(progress_sender_for_root(
1566 push_senders.clone(),
1567 bind_root_id.clone(),
1568 )));
1569 let inserted =
1570 executor.register_actor(bind_root_id.clone(), Arc::clone(&actor_ctx));
1571 drop(actor_ctx);
1572 log::debug!(
1576 "subc attach: registered actor for route {} root {}",
1577 route_channel,
1578 bind_root_id.as_path().display()
1579 );
1580 inserted
1581 };
1582
1583 pending_binds.insert(
1584 route_id,
1585 PendingBind {
1586 bind_root_id: bind_root_id.clone(),
1587 inserted_new_actor,
1588 cancelled: false,
1589 },
1590 );
1591
1592 let configure_request_id = configure_req.id.clone();
1593 let configure_rx = executor.submit_async(
1594 bind_root_id.clone(),
1595 Lane::Mutating,
1596 configure_request_id.clone(),
1597 Box::new(move |ctx| {
1598 log_ctx::with_session(Some(configure_session.clone()), || {
1599 dispatch(configure_req, ctx)
1600 })
1601 }),
1602 );
1603
1604 let completion_tx = control_completion_tx.clone();
1605 let completion_executor = Arc::clone(executor);
1606 let completion_identity = route_identity;
1607 let completion_root = bind_root_id.clone();
1608 let completion_route_channel = route_channel;
1609 let completion_ver = frame.header.ver;
1610 let completion_corr = frame.header.corr;
1611 let completion_flags = frame.header.flags;
1612 tokio::spawn(async move {
1613 let configure_response =
1614 await_executor_response(configure_rx, configure_request_id.clone()).await;
1615 let drain_response = if configure_response.success && !root_was_live {
1616 let drain_request_id = format!("subc-bind-drain-{completion_route_channel}");
1617 let drain_response_id = drain_request_id.clone();
1618 let drain_rx = completion_executor.submit_async(
1619 completion_root.clone(),
1620 Lane::Mutating,
1621 drain_request_id.clone(),
1622 Box::new(move |ctx| {
1623 runtime_drain::drain_build_completions(ctx);
1624 Response::success(drain_response_id, json!({ "drained": true }))
1625 }),
1626 );
1627 Some(await_executor_response(drain_rx, drain_request_id).await)
1628 } else {
1629 None
1630 };
1631
1632 let completion = RouteBindCompletion {
1633 route_channel: completion_route_channel,
1634 identity: completion_identity,
1635 bind_root_id: completion_root,
1636 inserted_new_actor,
1637 configure_response,
1638 drain_response,
1639 diagnostics_on_edit,
1640 ver: completion_ver,
1641 corr: completion_corr,
1642 flags: completion_flags,
1643 };
1644 if completion_tx.send(completion).await.is_err() {
1645 log::debug!(
1646 "subc attach: dropped RouteBind completion for route {} after loop exit",
1647 completion_route_channel
1648 );
1649 }
1650 });
1651
1652 Ok(())
1653 }
1654 }
1655}
1656
1657fn install_bash_compressor(ctx: &AppContext) {
1658 let filter_registry_handle = ctx.shared_filter_registry();
1660 let compress_flag = ctx.bash_compress_flag();
1661 ctx.bash_background().set_compressor_with_exit_code(
1662 move |command: &str, output: String, exit_code: Option<i32>| {
1663 if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
1664 return crate::compress::CompressionResult::new(output);
1665 }
1666 let registry_guard = match filter_registry_handle.read() {
1667 Ok(g) => g,
1668 Err(poisoned) => poisoned.into_inner(),
1669 };
1670 crate::compress::compress_with_registry_exit_code(
1671 command,
1672 &output,
1673 exit_code,
1674 ®istry_guard,
1675 )
1676 },
1677 );
1678}
1679
1680fn diagnostics_on_edit_from_tiers(tiers: &[ConfigTier]) -> bool {
1681 let mut diagnostics_on_edit = false;
1682 for tier in tiers {
1683 if let Some(value) = diagnostics_on_edit_from_doc(&tier.doc) {
1684 diagnostics_on_edit = value;
1685 }
1686 }
1687 diagnostics_on_edit
1688}
1689
1690fn diagnostics_on_edit_from_doc(doc: &str) -> Option<bool> {
1691 let stripped = strip_jsonc_for_subc(doc);
1692 let value = serde_json::from_str::<Value>(&stripped).ok()?;
1693 value
1694 .get("lsp")
1695 .and_then(Value::as_object)?
1696 .get("diagnostics_on_edit")
1697 .and_then(Value::as_bool)
1698}
1699
1700fn strip_jsonc_for_subc(source: &str) -> String {
1701 strip_trailing_commas_for_subc(&strip_jsonc_comments_for_subc(source))
1702}
1703
1704fn strip_jsonc_comments_for_subc(source: &str) -> String {
1705 let mut output = String::with_capacity(source.len());
1706 let mut chars = source.chars().peekable();
1707 let mut in_string = false;
1708 let mut escaped = false;
1709
1710 while let Some(ch) = chars.next() {
1711 if in_string {
1712 output.push(ch);
1713 if escaped {
1714 escaped = false;
1715 } else if ch == '\\' {
1716 escaped = true;
1717 } else if ch == '"' {
1718 in_string = false;
1719 }
1720 continue;
1721 }
1722
1723 if ch == '"' {
1724 in_string = true;
1725 output.push(ch);
1726 continue;
1727 }
1728
1729 if ch == '/' {
1730 match chars.peek().copied() {
1731 Some('/') => {
1732 chars.next();
1733 for next in chars.by_ref() {
1734 if next == '\n' {
1735 output.push('\n');
1736 break;
1737 }
1738 }
1739 }
1740 Some('*') => {
1741 chars.next();
1742 let mut previous = '\0';
1743 for next in chars.by_ref() {
1744 if next == '\n' {
1745 output.push('\n');
1746 }
1747 if previous == '*' && next == '/' {
1748 break;
1749 }
1750 previous = next;
1751 }
1752 }
1753 _ => output.push(ch),
1754 }
1755 continue;
1756 }
1757
1758 output.push(ch);
1759 }
1760
1761 output
1762}
1763
1764fn strip_trailing_commas_for_subc(source: &str) -> String {
1765 let chars = source.chars().collect::<Vec<_>>();
1766 let mut output = String::with_capacity(source.len());
1767 let mut index = 0usize;
1768 let mut in_string = false;
1769 let mut escaped = false;
1770
1771 while index < chars.len() {
1772 let ch = chars[index];
1773 if in_string {
1774 output.push(ch);
1775 if escaped {
1776 escaped = false;
1777 } else if ch == '\\' {
1778 escaped = true;
1779 } else if ch == '"' {
1780 in_string = false;
1781 }
1782 index += 1;
1783 continue;
1784 }
1785
1786 if ch == '"' {
1787 in_string = true;
1788 output.push(ch);
1789 index += 1;
1790 continue;
1791 }
1792
1793 if ch == ',' {
1794 let mut next = index + 1;
1795 while next < chars.len() && chars[next].is_whitespace() {
1796 next += 1;
1797 }
1798 if next < chars.len() && matches!(chars[next], '}' | ']') {
1799 index += 1;
1800 continue;
1801 }
1802 }
1803
1804 output.push(ch);
1805 index += 1;
1806 }
1807
1808 output
1809}
1810
1811async fn send_route_bind_error(
1812 tx: &mpsc::Sender<Frame>,
1813 frame: &Frame,
1814 code: &str,
1815 message: &str,
1816) -> Result<(), SubcError> {
1817 send_route_bind_error_parts(
1818 tx,
1819 frame.header.ver,
1820 frame.header.corr,
1821 frame.header.flags,
1822 code,
1823 message,
1824 )
1825 .await
1826}
1827
1828async fn send_route_bind_error_parts(
1829 tx: &mpsc::Sender<Frame>,
1830 ver: u8,
1831 corr: u64,
1832 flags: Flags,
1833 code: &str,
1834 message: &str,
1835) -> Result<(), SubcError> {
1836 let response = build_error_frame(ver, 0, corr, flags, code, message)?;
1837 send_frame(tx, response).await?;
1838 log::warn!("subc attach: route bind rejected ({code}): {message}");
1839 Ok(())
1840}
1841
1842async fn handle_tool_call(
1847 tx: &mpsc::Sender<Frame>,
1848 frame: &Frame,
1849 routes: &HashMap<RouteChannel, RouteIdentity>,
1850 pending_binds: &HashMap<RouteChannel, PendingBind>,
1851 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1852 executor: &Arc<Executor>,
1853 shutdown: &Arc<Notify>,
1854 dispatch: DispatchFn,
1855 allow_native_passthrough: bool,
1856) -> Result<(), SubcError> {
1857 let route_id = route_key(frame.header.channel);
1858 if pending_binds.contains_key(&route_id) {
1859 let error = build_error_frame(
1860 frame.header.ver,
1861 frame.header.channel,
1862 frame.header.corr,
1863 frame.header.flags,
1864 "route_not_bound",
1865 "route is not bound before tool call",
1866 )?;
1867 return send_frame(tx, error).await;
1868 }
1869
1870 let Some(identity) = routes.get(&route_id).cloned() else {
1871 let error = build_error_frame(
1872 frame.header.ver,
1873 frame.header.channel,
1874 frame.header.corr,
1875 frame.header.flags,
1876 "route_not_bound",
1877 "route is not bound before tool call",
1878 )?;
1879 return send_frame(tx, error).await;
1880 };
1881 if let Some(meta) = live_roots.get_mut(&identity.root) {
1882 meta.touch();
1883 }
1884
1885 let call = serde_json::from_slice::<ToolCallRequest>(&frame.body).map_err(SubcError::Json)?;
1886 let bare_name = call.name.clone();
1887 let format_context = crate::subc_format::FormatContext::from_tool_call(
1888 &bare_name,
1889 &call.arguments,
1890 identity.project_root.as_path(),
1891 );
1892
1893 let request_id = format!("subc-{}-{}", frame.header.channel, frame.header.corr);
1894 let project_root = identity.project_root.as_path();
1895 let diagnostics_on_edit = live_roots
1896 .get(&identity.root)
1897 .map(|meta| meta.diagnostics_on_edit)
1898 .unwrap_or(false);
1899 let translate_context = crate::subc_translate::TranslateContext {
1900 diagnostics_on_edit,
1901 };
1902 let (command, translated_args) = match crate::subc_translate::subc_translate_with_context(
1903 &call.name,
1904 &call.arguments,
1905 project_root,
1906 translate_context,
1907 ) {
1908 Ok(t) => (t.command, t.args),
1909 Err(err) if is_subc_agent_core_tool(&call.name) => {
1912 let response = Response::error(request_id.clone(), err.code, err.message);
1913 let response_frame = build_tool_response_frame(
1914 frame.header.ver,
1915 frame.header.channel,
1916 frame.header.corr,
1917 frame.header.flags,
1918 &bare_name,
1919 &format_context,
1920 &response,
1921 )?;
1922 return send_frame(tx, response_frame).await;
1923 }
1924 Err(_) if !allow_native_passthrough => {
1932 log::warn!(
1933 "subc tool call: rejecting non-manifest tool name {:?} on route {} (fail-closed)",
1934 call.name,
1935 frame.header.channel
1936 );
1937 let response = Response::error(
1938 request_id.clone(),
1939 "unknown_tool",
1940 format!("tool {:?} is not in the AFT tool manifest", call.name),
1941 );
1942 let response_frame = build_tool_response_frame(
1943 frame.header.ver,
1944 frame.header.channel,
1945 frame.header.corr,
1946 frame.header.flags,
1947 &bare_name,
1948 &format_context,
1949 &response,
1950 )?;
1951 return send_frame(tx, response_frame).await;
1952 }
1953 Err(_) => {
1956 let map = call.arguments.as_object().cloned().unwrap_or_default();
1957 (call.name.clone(), map)
1958 }
1959 };
1960
1961 let lane = command_lane(&command);
1962 let command_for_finalize = command.clone();
1963 let session_for_finalize = identity.session.clone();
1964 let mut map = translated_args;
1965 map.insert("id".to_string(), json!(request_id.clone()));
1966 map.insert("command".to_string(), json!(command));
1967 map.insert("session_id".to_string(), json!(identity.session.clone()));
1968
1969 let raw_req = match serde_json::from_value::<RawRequest>(Value::Object(map)) {
1970 Ok(req) => req,
1971 Err(error) => {
1972 let response = Response::error(
1973 request_id.clone(),
1974 "invalid_request",
1975 format!("failed to build request from tool call: {error}"),
1976 );
1977 let response_frame = build_tool_response_frame(
1978 frame.header.ver,
1979 frame.header.channel,
1980 frame.header.corr,
1981 frame.header.flags,
1982 &bare_name,
1983 &format_context,
1984 &response,
1985 )?;
1986 return send_frame(tx, response_frame).await;
1987 }
1988 };
1989
1990 let bare_name_for_frame = bare_name.clone();
1991 let format_context_for_frame = format_context;
1992 let rx = executor.submit_async(
1993 identity.root,
1994 lane,
1995 request_id.clone(),
1996 Box::new(move |ctx| {
1997 log_ctx::with_session(Some(session_for_finalize.clone()), || {
1998 let mut response = dispatch(raw_req, ctx);
1999 crate::response_finalize::attach_bg_completions(
2000 &mut response,
2001 ctx,
2002 &session_for_finalize,
2003 &command_for_finalize,
2004 );
2005 crate::response_finalize::attach_status_bar(
2006 &mut response,
2007 ctx,
2008 &command_for_finalize,
2009 );
2010 response
2011 })
2012 }),
2013 );
2014 let completion_tx = tx.clone();
2015 let completion_shutdown = Arc::clone(shutdown);
2016 let route_channel = frame.header.channel;
2017 let corr = frame.header.corr;
2018 let flags = frame.header.flags;
2019 let ver = frame.header.ver;
2020 let is_mutating = lane == Lane::Mutating;
2021 tokio::spawn(async move {
2022 let response = await_executor_response(rx, request_id.clone()).await;
2023 let fatal = is_mutating && response_is_internal_error(&response);
2024 match build_tool_response_frame(
2025 ver,
2026 route_channel,
2027 corr,
2028 flags,
2029 &bare_name_for_frame,
2030 &format_context_for_frame,
2031 &response,
2032 ) {
2033 Ok(response_frame) => {
2034 let _ = completion_tx.send(response_frame).await;
2035 }
2036 Err(error) => {
2037 log::error!("subc attach: failed to build tool response frame: {error}");
2038 }
2039 }
2040 if fatal {
2041 signal_fatal_teardown(
2042 &completion_tx,
2043 Some(route_channel),
2044 ver,
2045 corr,
2046 &completion_shutdown,
2047 )
2048 .await;
2049 }
2050 });
2051 Ok(())
2052}
2053
2054fn submit_maintenance_drain(
2055 executor: &Arc<Executor>,
2056 root_id: ProjectRootId,
2057 completion_tx: &mpsc::Sender<(ProjectRootId, Response)>,
2058) {
2059 let request_id = format!(
2060 "subc-maintenance-drain-{}",
2061 root_id.as_path().to_string_lossy()
2062 );
2063 let response_id = request_id.clone();
2064 let completion_root_id = root_id.clone();
2065 let rx = executor.submit_async(
2066 root_id,
2067 Lane::Mutating,
2068 request_id.clone(),
2069 Box::new(move |ctx| {
2070 runtime_drain::drain_configure_warning_events(ctx);
2071 runtime_drain::drain_search_index_events(ctx);
2072 runtime_drain::drain_callgraph_store_events(ctx);
2073 runtime_drain::drain_semantic_index_events(ctx);
2074 runtime_drain::drain_semantic_refresh_events(ctx);
2075 runtime_drain::drain_inspect_events(ctx);
2076 runtime_drain::drain_watcher_events(ctx);
2077 runtime_drain::drain_lsp_events(ctx);
2078 Response::success(response_id, json!({ "drained": true }))
2079 }),
2080 );
2081 let completion_tx = completion_tx.clone();
2082 tokio::spawn(async move {
2083 let response = await_executor_response(rx, request_id).await;
2084 let _ = completion_tx.send((completion_root_id, response)).await;
2085 });
2086}
2087
2088async fn await_executor_response(rx: oneshot::Receiver<Response>, request_id: String) -> Response {
2089 rx.await
2090 .unwrap_or_else(|_| Response::error(request_id, "internal_error", "executor dropped"))
2091}
2092
2093fn build_tool_response_frame(
2094 ver: u8,
2095 route_channel: u16,
2096 corr: u64,
2097 flags: Flags,
2098 bare_name: &str,
2099 format_context: &crate::subc_format::FormatContext,
2100 response: &Response,
2101) -> Result<Frame, SubcError> {
2102 let text =
2103 crate::subc_format::format_response_with_context(bare_name, response, format_context);
2104 let is_error = !response.success;
2105 let result = json!({
2106 "content": [{ "type": "text", "text": text }],
2107 "isError": is_error,
2108 });
2109 let body = serde_json::to_vec(&result).map_err(SubcError::Json)?;
2110
2111 Frame::build_with_version(ver, FrameType::Response, flags, route_channel, corr, body)
2112 .map_err(SubcError::FrameBuild)
2113}
2114
2115fn build_error_frame(
2116 ver: u8,
2117 channel: u16,
2118 corr: u64,
2119 flags: Flags,
2120 code: &str,
2121 message: &str,
2122) -> Result<Frame, SubcError> {
2123 let body = serde_json::to_vec(&ErrorBody {
2124 code: code.to_string(),
2125 message: message.to_string(),
2126 })
2127 .map_err(SubcError::Json)?;
2128 Frame::build_with_version(ver, FrameType::Error, flags, channel, corr, body)
2129 .map_err(SubcError::FrameBuild)
2130}
2131
2132fn build_goodbye_frame(ver: u8, channel: u16, corr: u64) -> Result<Frame, SubcError> {
2133 Frame::build_with_version(
2134 ver,
2135 FrameType::Goodbye,
2136 control_flags(),
2137 channel,
2138 corr,
2139 Vec::new(),
2140 )
2141 .map_err(SubcError::FrameBuild)
2142}
2143
2144async fn signal_fatal_teardown(
2145 tx: &mpsc::Sender<Frame>,
2146 route_channel: Option<u16>,
2147 ver: u8,
2148 corr: u64,
2149 shutdown: &Arc<Notify>,
2150) {
2151 if let Some(route_channel) = route_channel {
2152 if let Ok(frame) = build_goodbye_frame(ver, route_channel, corr) {
2153 if let Err(error) = send_frame(tx, frame).await {
2154 log::warn!(
2155 "subc attach: failed to queue fatal route Goodbye for route {route_channel}: {error}"
2156 );
2157 }
2158 }
2159 }
2160 if let Ok(frame) = build_goodbye_frame(ver, 0, 0) {
2161 if let Err(error) = send_frame(tx, frame).await {
2162 log::warn!("subc attach: failed to queue fatal channel-0 Goodbye: {error}");
2163 }
2164 }
2165 shutdown.notify_one();
2166}
2167
2168fn response_message(response: &Response, fallback: &str) -> String {
2169 response
2170 .data
2171 .get("message")
2172 .and_then(Value::as_str)
2173 .map(ToOwned::to_owned)
2174 .unwrap_or_else(|| fallback.to_string())
2175}
2176
2177fn response_is_internal_error(response: &Response) -> bool {
2178 !response.success && response.data.get("code").and_then(Value::as_str) == Some("internal_error")
2179}
2180
2181fn is_subc_agent_core_tool(name: &str) -> bool {
2182 matches!(
2183 name,
2184 "status" | "read" | "write" | "edit" | "grep" | "search" | "outline" | "inspect"
2185 )
2186}
2187
2188fn command_lane(command: &str) -> Lane {
2189 match command {
2190 "ping"
2191 | "version"
2192 | "echo"
2193 | "bash_drain_completions"
2194 | "bash_regex_match"
2195 | "db_get_state"
2196 | "db_get_host_state"
2197 | "read"
2198 | "undo_preview"
2199 | "edit_history"
2200 | "checkpoint_paths"
2201 | "list_checkpoints"
2202 | "glob"
2203 | "grep"
2204 | "git_conflicts"
2205 | "ast_search" => Lane::PureRead,
2206
2207 "bash_status" | "outline" | "zoom" => Lane::PureRead,
2211
2212 "status"
2213 | "inspect"
2214 | "lsp_diagnostics"
2215 | "lsp_inspect"
2216 | "lsp_hover"
2217 | "lsp_goto_definition"
2218 | "lsp_find_references"
2219 | "lsp_prepare_rename" => Lane::SerialLspStatus,
2220
2221 "semantic_search" | "search" | "callers" | "impact" | "call_tree" | "trace_to"
2222 | "trace_to_symbol" | "trace_data" | "inspect_tier2_run" => Lane::HeavyInit,
2223
2224 "bash"
2225 | "bash_ack_completions"
2226 | "bash_notify"
2227 | "bash_unnotify"
2228 | "bash_promote"
2229 | "bash_kill"
2230 | "bash_write"
2231 | "db_set_state"
2232 | "db_set_host_state"
2233 | "undo"
2234 | "checkpoint"
2235 | "restore_checkpoint"
2236 | "write"
2237 | "delete_file"
2238 | "move_file"
2239 | "edit"
2240 | "edit_symbol"
2241 | "edit_match"
2242 | "batch"
2243 | "add_import"
2244 | "remove_import"
2245 | "organize_imports"
2246 | "configure"
2247 | "move_symbol"
2248 | "extract_function"
2249 | "inline_symbol"
2250 | "ast_replace"
2251 | "lsp_rename"
2252 | "list_filters"
2253 | "trust_filter_project"
2254 | "untrust_filter_project"
2255 | "snapshot" => Lane::Mutating,
2256
2257 _ => Lane::Mutating,
2258 }
2259}
2260
2261#[derive(Debug, Deserialize)]
2262struct ToolCallRequest {
2263 name: String,
2264 #[serde(default)]
2265 arguments: Value,
2266}
2267
2268static SUBC_TOOL_SCHEMAS: LazyLock<serde_json::Map<String, Value>> = LazyLock::new(|| {
2269 serde_json::from_str(include_str!("subc_tool_schemas.json"))
2270 .unwrap_or_else(|e| panic!("subc_tool_schemas.json: {e}"))
2271});
2272
2273fn tool_schema(name: &str) -> Value {
2274 SUBC_TOOL_SCHEMAS.get(name).cloned().unwrap_or_else(|| {
2275 log::warn!(
2276 "subc build_manifest: missing embedded schema for tool {name:?}; using placeholder"
2277 );
2278 json!({ "type": "object" })
2279 })
2280}
2281
2282fn build_manifest() -> ModuleManifest {
2287 let tool = |name: &str, execution_mode: ExecutionMode| Tool {
2288 name: name.to_string(),
2289 execution_mode,
2290 schema: tool_schema(name),
2291 };
2292 ModuleManifest {
2299 module_id: "aft".to_string(),
2300 module_version: env!("CARGO_PKG_VERSION").to_string(),
2301 protocol_ver: PROTOCOL_VERSION,
2302 trust_tier: TrustTier::FirstParty,
2303 provides: vec![ProviderRole::ToolProvider {
2304 tools: vec![
2305 tool("status", ExecutionMode::Pure),
2306 tool("read", ExecutionMode::Pure),
2307 tool("grep", ExecutionMode::Pure),
2308 tool("search", ExecutionMode::Pure),
2309 tool("outline", ExecutionMode::Pure),
2310 tool("inspect", ExecutionMode::Pure),
2311 tool("edit", ExecutionMode::Mutating),
2312 tool("write", ExecutionMode::Mutating),
2313 ],
2314 identity_scope: vec![IdentityScope::Session, IdentityScope::Project],
2315 concurrency: Concurrency::ModuleManaged,
2316 emits_push: true,
2317 sub_supervises: true,
2318 }],
2319 consumes: Vec::new(),
2320 scheduled_tasks: Vec::new(),
2321 bindings: Bindings {
2322 storage: StorageBinding {
2323 kind: StorageKind::Sqlite,
2324 scope: StorageScope::Project,
2325 owns_schema: true,
2326 },
2327 vault_grants: Vec::new(),
2328 identity: IdentityBinding {
2329 requires: vec![IdentityScope::Project],
2330 optional: vec![IdentityScope::Session],
2331 },
2332 },
2333 }
2334}
2335
2336fn control_flags() -> Flags {
2337 Flags::new(false, Priority::Passive, false)
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342 use super::*;
2343 use crate::bash_background::BgTaskStatus;
2344 use crate::protocol::{
2345 BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, ConfigureWarningsFrame,
2346 ProgressFrame, StatusChangedFrame,
2347 };
2348 use serde_json::json;
2349
2350 fn test_root(name: &str) -> (tempfile::TempDir, ProjectRootId) {
2351 let dir = tempfile::Builder::new()
2352 .prefix(name)
2353 .tempdir()
2354 .expect("temp root");
2355 let root = ProjectRootId::from_path(dir.path()).expect("project root id");
2356 (dir, root)
2357 }
2358
2359 fn status_frame(seq: u64) -> PushFrame {
2360 status_frame_with_session(seq, None)
2361 }
2362
2363 fn status_frame_with_session(seq: u64, session_id: Option<&str>) -> PushFrame {
2364 PushFrame::StatusChanged(StatusChangedFrame {
2365 frame_type: "status_changed",
2366 session_id: session_id.map(str::to_string),
2367 snapshot: json!({ "seq": seq }),
2368 })
2369 }
2370
2371 fn completion_frame(task_id: &str) -> PushFrame {
2372 completion_frame_with_session(task_id, "session-1")
2373 }
2374
2375 fn completion_frame_with_session(task_id: &str, session_id: &str) -> PushFrame {
2376 PushFrame::BashCompleted(BashCompletedFrame {
2377 frame_type: "bash_completed",
2378 task_id: task_id.to_string(),
2379 session_id: session_id.to_string(),
2380 status: BgTaskStatus::Completed,
2381 exit_code: Some(0),
2382 command: format!("echo {task_id}"),
2383 output_preview: String::new(),
2384 output_truncated: false,
2385 original_tokens: None,
2386 compressed_tokens: None,
2387 tokens_skipped: false,
2388 })
2389 }
2390
2391 fn long_running_frame(task_id: &str, elapsed_ms: u64) -> PushFrame {
2392 long_running_frame_with_session(task_id, "session-1", elapsed_ms)
2393 }
2394
2395 fn long_running_frame_with_session(
2396 task_id: &str,
2397 session_id: &str,
2398 elapsed_ms: u64,
2399 ) -> PushFrame {
2400 PushFrame::BashLongRunning(BashLongRunningFrame {
2401 frame_type: "bash_long_running",
2402 task_id: task_id.to_string(),
2403 session_id: session_id.to_string(),
2404 command: format!("sleep {elapsed_ms}"),
2405 elapsed_ms,
2406 })
2407 }
2408
2409 fn pattern_match_frame(session_id: &str) -> PushFrame {
2410 PushFrame::BashPatternMatch(BashPatternMatchFrame {
2411 frame_type: "bash_pattern_match",
2412 task_id: "task-pattern".to_string(),
2413 session_id: session_id.to_string(),
2414 watch_id: "watch-1".to_string(),
2415 match_text: "needle".to_string(),
2416 match_offset: 7,
2417 context: "haystack needle".to_string(),
2418 once: true,
2419 reason: "pattern_match",
2420 })
2421 }
2422
2423 fn configure_warnings_frame(session_id: Option<&str>) -> PushFrame {
2424 PushFrame::ConfigureWarnings(ConfigureWarningsFrame {
2425 frame_type: "configure_warnings",
2426 session_id: session_id.map(str::to_string),
2427 project_root: "/tmp/subc-test".to_string(),
2428 source_file_count: 0,
2429 warnings: Vec::new(),
2430 })
2431 }
2432
2433 fn route_identity(root: &ProjectRootId, session_id: &str) -> RouteIdentity {
2434 RouteIdentity {
2435 root: root.clone(),
2436 project_root: root.as_path().to_path_buf(),
2437 harness: "opencode".to_string(),
2438 session: session_id.to_string(),
2439 }
2440 }
2441
2442 fn progress_frame(request_id: &str, kind: ProgressKind, chunk: &str) -> PushFrame {
2443 PushFrame::Progress(ProgressFrame::new(request_id, kind, chunk))
2444 }
2445
2446 fn status_seq(frame: &PushFrame) -> Option<u64> {
2447 match frame {
2448 PushFrame::StatusChanged(status) => status.snapshot.get("seq").and_then(|v| v.as_u64()),
2449 _ => None,
2450 }
2451 }
2452
2453 fn completion_task(frame: &PushFrame) -> Option<&str> {
2454 match frame {
2455 PushFrame::BashCompleted(completion) => Some(completion.task_id.as_str()),
2456 _ => None,
2457 }
2458 }
2459
2460 fn push_frame_task_id(frame: &Frame) -> Option<String> {
2461 let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2462 body.get("task_id")
2463 .and_then(serde_json::Value::as_str)
2464 .map(str::to_string)
2465 }
2466
2467 #[test]
2468 fn frame_classification_matches_push_delivery_contract() {
2469 let completion = completion_frame_with_session("done", "session-a");
2470 assert_eq!(frame_session(&completion), Some("session-a"));
2471 assert!(frame_is_reliable(&completion));
2472
2473 let long_running = long_running_frame_with_session("long", "session-b", 42);
2474 assert_eq!(frame_session(&long_running), Some("session-b"));
2475 assert!(!frame_is_reliable(&long_running));
2476
2477 let pattern_match = pattern_match_frame("session-c");
2478 assert_eq!(frame_session(&pattern_match), Some("session-c"));
2479 assert!(frame_is_reliable(&pattern_match));
2480
2481 let tagged_warnings = configure_warnings_frame(Some("session-d"));
2482 assert_eq!(frame_session(&tagged_warnings), Some("session-d"));
2483 assert!(frame_is_reliable(&tagged_warnings));
2484
2485 let untagged_warnings = configure_warnings_frame(None);
2486 assert_eq!(frame_session(&untagged_warnings), None);
2487 assert!(frame_is_reliable(&untagged_warnings));
2488
2489 let tagged_status = status_frame_with_session(1, Some("session-e"));
2490 assert_eq!(frame_session(&tagged_status), Some("session-e"));
2491 assert!(!frame_is_reliable(&tagged_status));
2492
2493 let project_status = status_frame(2);
2494 assert_eq!(frame_session(&project_status), None);
2495 assert!(!frame_is_reliable(&project_status));
2496
2497 let progress = progress_frame("request-1", ProgressKind::Stdout, "chunk");
2498 assert_eq!(frame_session(&progress), None);
2499 assert!(!frame_is_reliable(&progress));
2500 }
2501
2502 #[test]
2503 fn fan_out_push_frame_routes_session_scoped_and_project_scoped_frames() {
2504 let (_root_dir, root) = test_root("subc-session-routing-root");
2505 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(8);
2506 let identity1 = route_identity(&root, "session-1");
2507 let identity2 = route_identity(&root, "session-2");
2508 let mut routes = HashMap::new();
2509 routes.insert(route_key(1), identity1.clone());
2510 routes.insert(route_key(2), identity2.clone());
2511 let mut root_channels = HashMap::new();
2512 root_channels.insert(root.clone(), HashSet::from([route_key(1), route_key(2)]));
2513 let mut session_identity = HashMap::new();
2514 remember_session_identity(&mut session_identity, &identity1);
2515 remember_session_identity(&mut session_identity, &identity2);
2516 let mut retry_buffer = HashMap::new();
2517 let mut push_buffer = HashMap::new();
2518
2519 let session_result = fan_out_reliable_push_frame(
2520 &writer_tx,
2521 &routes,
2522 &root_channels,
2523 &session_identity,
2524 &mut retry_buffer,
2525 &mut push_buffer,
2526 &root,
2527 &completion_frame_with_session("session-only", "session-1"),
2528 );
2529 assert_eq!(
2530 session_result,
2531 FanOutResult {
2532 matched_channels: 1,
2533 sent_frames: 1,
2534 }
2535 );
2536 assert!(retry_buffer.is_empty());
2537 assert!(push_buffer.is_empty());
2538 let session_push = writer_rx.try_recv().expect("session push queued");
2539 assert_eq!(session_push.header.ty, FrameType::Push);
2540 assert_eq!(session_push.header.channel, 1);
2541 assert!(
2542 writer_rx.try_recv().is_err(),
2543 "session-scoped frame must not broadcast to sibling sessions"
2544 );
2545
2546 let project_result =
2547 fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(9));
2548 assert_eq!(
2549 project_result,
2550 FanOutResult {
2551 matched_channels: 2,
2552 sent_frames: 2,
2553 }
2554 );
2555 let project_channels: HashSet<_> = [
2556 writer_rx
2557 .try_recv()
2558 .expect("first project push")
2559 .header
2560 .channel,
2561 writer_rx
2562 .try_recv()
2563 .expect("second project push")
2564 .header
2565 .channel,
2566 ]
2567 .into_iter()
2568 .collect();
2569 assert_eq!(project_channels, HashSet::from([1, 2]));
2570 assert!(writer_rx.try_recv().is_err());
2571 }
2572
2573 #[test]
2574 fn push_buffer_drops_oldest_per_replay_key() {
2575 let (_root_dir, root) = test_root("subc-buffer-bound-root");
2576 let key = ReplayKey {
2577 root,
2578 harness: "opencode".to_string(),
2579 session: "session-1".to_string(),
2580 };
2581 let mut push_buffer = HashMap::new();
2582 let total = PUSH_BUFFER_MAX_PER_KEY + 3;
2583
2584 for index in 0..total {
2585 buffer_push_frame(
2586 &mut push_buffer,
2587 key.clone(),
2588 completion_frame(&format!("task-{index}")),
2589 );
2590 }
2591
2592 let buffered = push_buffer.get(&key).expect("buffer entry");
2593 assert_eq!(buffered.len(), PUSH_BUFFER_MAX_PER_KEY);
2594 let tasks: Vec<String> = buffered
2595 .iter()
2596 .filter_map(completion_task)
2597 .map(str::to_string)
2598 .collect();
2599 assert_eq!(tasks.first().map(String::as_str), Some("task-3"));
2600 assert_eq!(
2601 tasks.last().map(String::as_str),
2602 Some(format!("task-{}", total - 1).as_str())
2603 );
2604 }
2605
2606 #[test]
2607 fn replay_buffered_push_frames_drains_to_bound_channel() {
2608 let (_root_dir, root) = test_root("subc-buffer-replay-root");
2609 let key = ReplayKey {
2610 root,
2611 harness: "opencode".to_string(),
2612 session: "session-1".to_string(),
2613 };
2614 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
2615 let mut push_buffer = HashMap::new();
2616 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
2617 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-b"));
2618
2619 let replayed =
2620 replay_buffered_push_frames(&writer_tx, route_key(3), &mut push_buffer, &key);
2621
2622 assert_eq!(replayed, 2);
2623 assert!(!push_buffer.contains_key(&key));
2624 for expected_task in ["task-a", "task-b"] {
2625 let frame = writer_rx.try_recv().expect("replayed push");
2626 assert_eq!(frame.header.ty, FrameType::Push);
2627 assert_eq!(frame.header.channel, 3);
2628 let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2629 assert_eq!(body["task_id"].as_str(), Some(expected_task));
2630 }
2631 assert!(writer_rx.try_recv().is_err());
2632 }
2633
2634 #[test]
2635 fn coalesce_push_batch_collapses_lossy_and_preserves_reliable_fifo() {
2636 let (_root_dir, root) = test_root("subc-coalesce-root");
2637 let (_other_dir, other_root) = test_root("subc-coalesce-other");
2638
2639 let output = coalesce_push_batch(vec![
2640 (root.clone(), status_frame(1)),
2641 (root.clone(), completion_frame("task-1")),
2642 (root.clone(), status_frame(2)),
2643 (root.clone(), completion_frame("task-2")),
2644 (root.clone(), long_running_frame("long-task", 100)),
2645 (root.clone(), long_running_frame("long-task", 200)),
2646 (other_root.clone(), status_frame(9)),
2647 ]);
2648
2649 let completion_tasks: Vec<_> = output
2650 .iter()
2651 .filter_map(|(_, frame)| completion_task(frame))
2652 .collect();
2653 assert_eq!(completion_tasks, vec!["task-1", "task-2"]);
2654
2655 let root_statuses: Vec<_> = output
2656 .iter()
2657 .filter(|(output_root, _)| output_root == &root)
2658 .filter_map(|(_, frame)| status_seq(frame))
2659 .collect();
2660 assert_eq!(root_statuses, vec![2]);
2661
2662 let other_statuses: Vec<_> = output
2663 .iter()
2664 .filter(|(output_root, _)| output_root == &other_root)
2665 .filter_map(|(_, frame)| status_seq(frame))
2666 .collect();
2667 assert_eq!(other_statuses, vec![9]);
2668
2669 let long_running_elapsed: Vec<_> = output
2670 .iter()
2671 .filter_map(|(_, frame)| match frame {
2672 PushFrame::BashLongRunning(long_running) => Some(long_running.elapsed_ms),
2673 _ => None,
2674 })
2675 .collect();
2676 assert_eq!(long_running_elapsed, vec![200]);
2677 }
2678
2679 #[test]
2680 fn coalesce_push_batch_keeps_progress_stream_keys_separate() {
2681 let (_root_dir, root) = test_root("subc-progress-coalesce-root");
2682
2683 let output = coalesce_push_batch(vec![
2684 (
2685 root.clone(),
2686 progress_frame("request-1", ProgressKind::Stdout, "old stdout"),
2687 ),
2688 (
2689 root.clone(),
2690 progress_frame("request-1", ProgressKind::Stderr, "stderr"),
2691 ),
2692 (
2693 root.clone(),
2694 progress_frame("request-2", ProgressKind::Stdout, "other stdout"),
2695 ),
2696 (
2697 root.clone(),
2698 progress_frame("request-1", ProgressKind::Stdout, "new stdout"),
2699 ),
2700 ]);
2701
2702 let progress: Vec<_> = output
2703 .iter()
2704 .filter_map(|(_, frame)| match frame {
2705 PushFrame::Progress(progress) => Some((
2706 progress.request_id.as_str(),
2707 match progress.kind {
2708 ProgressKind::Stdout => "stdout",
2709 ProgressKind::Stderr => "stderr",
2710 },
2711 progress.chunk.as_str(),
2712 )),
2713 _ => None,
2714 })
2715 .collect();
2716
2717 assert_eq!(
2718 progress,
2719 vec![
2720 ("request-1", "stderr", "stderr"),
2721 ("request-2", "stdout", "other stdout"),
2722 ("request-1", "stdout", "new stdout"),
2723 ]
2724 );
2725 }
2726
2727 #[test]
2728 fn progress_sender_keeps_reliable_off_saturated_lossy_funnel_without_blocking() {
2729 let (_root_dir, root) = test_root("subc-push-full-root");
2730 let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1);
2731 let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
2732 let sender = progress_sender_for_root(
2733 PushSenders {
2734 lossy_tx,
2735 reliable_tx,
2736 },
2737 root.clone(),
2738 );
2739
2740 let started = Instant::now();
2741 sender(status_frame(1));
2742 sender(status_frame(2));
2743 sender(completion_frame("reliable-after-lossy-full"));
2744 assert!(
2745 started.elapsed() < Duration::from_millis(50),
2746 "saturated push sender must return immediately"
2747 );
2748
2749 let (received_root, received_frame) =
2750 lossy_rx.try_recv().expect("first lossy frame queued");
2751 assert_eq!(received_root, root);
2752 assert_eq!(status_seq(&received_frame), Some(1));
2753 assert!(
2754 lossy_rx.try_recv().is_err(),
2755 "second lossy frame should be dropped"
2756 );
2757
2758 let (reliable_root, reliable_frame) = reliable_rx
2759 .try_recv()
2760 .expect("reliable frame bypasses lossy backpressure");
2761 assert_eq!(reliable_root, root);
2762 assert_eq!(
2763 completion_task(&reliable_frame),
2764 Some("reliable-after-lossy-full")
2765 );
2766 assert!(reliable_rx.try_recv().is_err());
2767 }
2768
2769 #[test]
2770 fn fan_out_lossy_push_frame_drops_when_writer_is_full_without_blocking() {
2771 let (_root_dir, root) = test_root("subc-writer-full-root");
2772 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2773 writer_tx
2774 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2775 .expect("prefill writer queue");
2776
2777 let mut root_channels = HashMap::new();
2778 root_channels.insert(root.clone(), HashSet::from([route_key(7)]));
2779
2780 let routes = HashMap::new();
2781 let started = Instant::now();
2782 let result =
2783 fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(1));
2784 assert!(
2785 started.elapsed() < Duration::from_millis(50),
2786 "saturated writer fan-out must return immediately"
2787 );
2788 assert_eq!(
2789 result,
2790 FanOutResult {
2791 matched_channels: 1,
2792 sent_frames: 0,
2793 }
2794 );
2795
2796 let queued = writer_rx
2797 .try_recv()
2798 .expect("prefilled frame remains queued");
2799 assert_eq!(queued.header.ty, FrameType::Ping);
2800 assert!(
2801 writer_rx.try_recv().is_err(),
2802 "push should be dropped on full writer"
2803 );
2804 }
2805
2806 #[test]
2807 fn reliable_push_backpressure_buffers_and_retries_on_tick() {
2808 let (_root_dir, root) = test_root("subc-retry-buffer-root");
2809 let identity = route_identity(&root, "session-1");
2810 let key = ReplayKey::from_identity(&identity);
2811 let mut routes = HashMap::new();
2812 routes.insert(route_key(9), identity.clone());
2813 let mut root_channels = HashMap::new();
2814 root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2815 let mut session_identity = HashMap::new();
2816 remember_session_identity(&mut session_identity, &identity);
2817 let mut retry_buffer = HashMap::new();
2818 let mut push_buffer = HashMap::new();
2819 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2820 writer_tx
2821 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2822 .expect("prefill writer queue");
2823
2824 let result = fan_out_reliable_push_frame(
2825 &writer_tx,
2826 &routes,
2827 &root_channels,
2828 &session_identity,
2829 &mut retry_buffer,
2830 &mut push_buffer,
2831 &root,
2832 &completion_frame("retry-task"),
2833 );
2834
2835 assert_eq!(
2836 result,
2837 FanOutResult {
2838 matched_channels: 1,
2839 sent_frames: 0,
2840 }
2841 );
2842 assert!(push_buffer.is_empty());
2843 assert_eq!(retry_buffer.get(&route_key(9)).map(VecDeque::len), Some(1));
2844 assert_eq!(&retry_buffer[&route_key(9)][0].0, &key);
2845
2846 let queued = writer_rx.try_recv().expect("prefilled frame");
2847 assert_eq!(queued.header.ty, FrameType::Ping);
2848 assert_eq!(
2849 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2850 1
2851 );
2852 let retried = writer_rx.try_recv().expect("retried reliable push");
2853 assert_eq!(retried.header.ty, FrameType::Push);
2854 assert_eq!(retried.header.channel, 9);
2855 assert_eq!(push_frame_task_id(&retried).as_deref(), Some("retry-task"));
2856 assert!(!retry_buffer.contains_key(&route_key(9)));
2857 }
2858
2859 #[test]
2860 fn reliable_push_fifo_gates_new_frames_behind_retry_buffer() {
2861 let (_root_dir, root) = test_root("subc-retry-fifo-root");
2862 let identity = route_identity(&root, "session-1");
2863 let mut routes = HashMap::new();
2864 routes.insert(route_key(9), identity.clone());
2865 let mut root_channels = HashMap::new();
2866 root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2867 let mut session_identity = HashMap::new();
2868 remember_session_identity(&mut session_identity, &identity);
2869 let mut retry_buffer = HashMap::new();
2870 let mut push_buffer = HashMap::new();
2871 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2872 writer_tx
2873 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2874 .expect("prefill writer queue");
2875
2876 let first = completion_frame("fifo-1");
2877 let second = completion_frame("fifo-2");
2878 let _ = fan_out_reliable_push_frame(
2879 &writer_tx,
2880 &routes,
2881 &root_channels,
2882 &session_identity,
2883 &mut retry_buffer,
2884 &mut push_buffer,
2885 &root,
2886 &first,
2887 );
2888 let queued = writer_rx.try_recv().expect("free writer capacity");
2889 assert_eq!(queued.header.ty, FrameType::Ping);
2890
2891 let _ = fan_out_reliable_push_frame(
2892 &writer_tx,
2893 &routes,
2894 &root_channels,
2895 &session_identity,
2896 &mut retry_buffer,
2897 &mut push_buffer,
2898 &root,
2899 &second,
2900 );
2901 assert!(
2902 writer_rx.try_recv().is_err(),
2903 "second reliable frame must not bypass pending retry frame"
2904 );
2905 let queued_tasks: Vec<_> = retry_buffer[&route_key(9)]
2906 .iter()
2907 .filter_map(|(_, frame)| completion_task(frame))
2908 .collect();
2909 assert_eq!(queued_tasks, vec!["fifo-1", "fifo-2"]);
2910
2911 assert_eq!(
2912 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2913 1
2914 );
2915 let first_sent = writer_rx.try_recv().expect("first reliable push");
2916 assert_eq!(push_frame_task_id(&first_sent).as_deref(), Some("fifo-1"));
2917 assert_eq!(
2918 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2919 1
2920 );
2921 let second_sent = writer_rx.try_recv().expect("second reliable push");
2922 assert_eq!(push_frame_task_id(&second_sent).as_deref(), Some("fifo-2"));
2923 assert!(!retry_buffer.contains_key(&route_key(9)));
2924 }
2925
2926 #[test]
2927 fn replay_buffered_push_frames_drains_incrementally_on_backpressure() {
2928 let (_root_dir, root) = test_root("subc-incremental-replay-root");
2929 let key = ReplayKey {
2930 root,
2931 harness: "opencode".to_string(),
2932 session: "session-1".to_string(),
2933 };
2934 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(2);
2935 writer_tx
2936 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2937 .expect("prefill writer queue");
2938 let mut push_buffer = HashMap::new();
2939 for task in ["replay-1", "replay-2", "replay-3"] {
2940 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame(task));
2941 }
2942
2943 assert_eq!(
2944 replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2945 1
2946 );
2947 assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(2));
2948 let remaining: Vec<_> = push_buffer[&key]
2949 .iter()
2950 .filter_map(completion_task)
2951 .collect();
2952 assert_eq!(remaining, vec!["replay-2", "replay-3"]);
2953
2954 let queued = writer_rx.try_recv().expect("prefilled frame");
2955 assert_eq!(queued.header.ty, FrameType::Ping);
2956 let first = writer_rx.try_recv().expect("first replayed push");
2957 assert_eq!(push_frame_task_id(&first).as_deref(), Some("replay-1"));
2958
2959 assert_eq!(
2960 replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2961 2
2962 );
2963 let second = writer_rx.try_recv().expect("second replayed push");
2964 let third = writer_rx.try_recv().expect("third replayed push");
2965 assert_eq!(push_frame_task_id(&second).as_deref(), Some("replay-2"));
2966 assert_eq!(push_frame_task_id(&third).as_deref(), Some("replay-3"));
2967 assert!(!push_buffer.contains_key(&key));
2968 }
2969
2970 #[test]
2971 fn goodbye_migrates_retry_buffer_into_detach_replay() {
2972 let (_root_dir, root) = test_root("subc-goodbye-migration-root");
2973 let key = ReplayKey {
2974 root,
2975 harness: "opencode".to_string(),
2976 session: "session-1".to_string(),
2977 };
2978 let mut retry_buffer = HashMap::new();
2979 buffer_retry_frame(
2980 &mut retry_buffer,
2981 route_key(5),
2982 key.clone(),
2983 completion_frame("migrated-task"),
2984 );
2985 let mut push_buffer = HashMap::new();
2986
2987 assert_eq!(
2988 migrate_retry_buffer_to_push_buffer(&mut retry_buffer, route_key(5), &mut push_buffer),
2989 1
2990 );
2991
2992 assert!(!retry_buffer.contains_key(&route_key(5)));
2993 assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(1));
2994 assert_eq!(
2995 completion_task(&push_buffer[&key][0]),
2996 Some("migrated-task")
2997 );
2998 }
2999
3000 #[test]
3001 fn permanent_push_send_failure_is_dropped_not_retried_forever() {
3002 let (_root_dir, root) = test_root("subc-permanent-failure-root");
3003 let key = ReplayKey {
3004 root,
3005 harness: "opencode".to_string(),
3006 session: "session-1".to_string(),
3007 };
3008 let (writer_tx, writer_rx) = mpsc::channel::<Frame>(1);
3009 drop(writer_rx);
3010
3011 let mut push_buffer = HashMap::new();
3012 buffer_push_frame(
3013 &mut push_buffer,
3014 key.clone(),
3015 completion_frame("closed-replay"),
3016 );
3017 assert_eq!(
3018 replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
3019 0
3020 );
3021 assert!(!push_buffer.contains_key(&key));
3022
3023 let mut retry_buffer = HashMap::new();
3024 buffer_retry_frame(
3025 &mut retry_buffer,
3026 route_key(4),
3027 key,
3028 completion_frame("closed-retry"),
3029 );
3030 assert_eq!(
3031 drain_retry_buffer_for_channel(&writer_tx, route_key(4), &mut retry_buffer),
3032 0
3033 );
3034 assert!(!retry_buffer.contains_key(&route_key(4)));
3035 }
3036
3037 #[test]
3038 fn completed_task_suppresses_stale_long_running_lossy_push() {
3039 let mut completed_tasks = CompletedTaskIds::default();
3040 assert!(!should_drop_lossy_push(
3041 &completed_tasks,
3042 &long_running_frame("stale-task", 100)
3043 ));
3044
3045 completed_tasks.remember("stale-task");
3046
3047 assert!(should_drop_lossy_push(
3048 &completed_tasks,
3049 &long_running_frame("stale-task", 200)
3050 ));
3051 assert!(!should_drop_lossy_push(
3052 &completed_tasks,
3053 &long_running_frame("other-task", 200)
3054 ));
3055 }
3056
3057 #[tokio::test]
3058 async fn control_send_times_out_when_writer_queue_remains_full() {
3059 let (writer_tx, _writer_rx) = mpsc::channel::<Frame>(1);
3060 writer_tx
3061 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
3062 .expect("prefill writer queue");
3063 let started = Instant::now();
3064
3065 let result = send_frame(
3066 &writer_tx,
3067 Frame::build(FrameType::Pong, control_flags(), 0, 2, Vec::new()).unwrap(),
3068 )
3069 .await;
3070
3071 assert!(matches!(result, Err(SubcError::WriterBackpressureTimeout)));
3072 assert!(
3073 started.elapsed() < Duration::from_secs(2),
3074 "control send guard should be bounded"
3075 );
3076 }
3077
3078 const CORE_TOOLS: [&str; 8] = [
3079 "status", "read", "grep", "search", "outline", "inspect", "edit", "write",
3080 ];
3081
3082 fn is_bare_placeholder_schema(schema: &Value) -> bool {
3083 schema == &json!({ "type": "object" })
3084 }
3085
3086 #[test]
3087 fn build_manifest_serves_embedded_tool_schemas() {
3088 let manifest = build_manifest();
3089 let tools = match manifest.provides.first() {
3090 Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3091 _ => panic!("expected ToolProvider"),
3092 };
3093 let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3094 for name in CORE_TOOLS {
3095 let tool = by_name
3096 .get(name)
3097 .unwrap_or_else(|| panic!("missing tool {name}"));
3098 assert!(
3099 !is_bare_placeholder_schema(&tool.schema),
3100 "{name} must not use bare placeholder schema"
3101 );
3102 assert_eq!(
3103 tool.schema.get("type").and_then(|v| v.as_str()),
3104 Some("object"),
3105 "{name} schema must be an object"
3106 );
3107 }
3108
3109 let read = by_name["read"]
3110 .schema
3111 .get("properties")
3112 .and_then(|p| p.as_object());
3113 let read_props = read.expect("read schema properties");
3114 assert!(
3115 read_props.contains_key("filePath"),
3116 "read schema must expose filePath"
3117 );
3118
3119 let status = &by_name["status"].schema;
3120 assert_eq!(
3121 status.get("properties").and_then(|v| v.as_object()),
3122 Some(&serde_json::Map::new()),
3123 "status schema must have empty properties"
3124 );
3125 assert_eq!(
3126 status.get("additionalProperties").and_then(|v| v.as_bool()),
3127 Some(false),
3128 "status schema must forbid additionalProperties"
3129 );
3130 }
3131
3132 #[test]
3133 fn build_manifest_classifies_execution_mode_by_observable_effect() {
3134 let manifest = build_manifest();
3135 let tools = match manifest.provides.first() {
3136 Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3137 _ => panic!("expected ToolProvider"),
3138 };
3139 let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3140
3141 for name in ["status", "read", "grep", "search", "outline", "inspect"] {
3144 assert_eq!(
3145 by_name[name].execution_mode,
3146 ExecutionMode::Pure,
3147 "{name} produces no observable side effect and must be Pure"
3148 );
3149 }
3150 for name in ["edit", "write"] {
3152 assert_eq!(
3153 by_name[name].execution_mode,
3154 ExecutionMode::Mutating,
3155 "{name} writes files and must be Mutating"
3156 );
3157 }
3158 }
3159}
3160
3161#[derive(Debug)]
3162pub enum SubcError {
3163 Runtime(std::io::Error),
3164 ConnectionFile {
3165 path: PathBuf,
3166 source: subc_transport::ConnectionFileError,
3167 },
3168 NoEndpoint {
3169 path: PathBuf,
3170 },
3171 InvalidEndpoint {
3172 path: PathBuf,
3173 endpoint: String,
3174 },
3175 Connect {
3176 endpoint: String,
3177 source: std::io::Error,
3178 },
3179 Auth {
3180 endpoint: String,
3181 source: subc_transport::AuthError,
3182 },
3183 FrameIo(subc_transport::FrameIoError),
3184 FrameBuild(subc_protocol::FrameBuildError),
3185 WriterClosed,
3186 WriterBackpressureTimeout,
3187 WriterJoin(tokio::task::JoinError),
3188 Json(serde_json::Error),
3189 ClosedBeforeHelloAck,
3190 HelloRejected {
3191 body: Option<ErrorBody>,
3192 },
3193 UnexpectedFrame {
3194 ty: FrameType,
3195 },
3196}
3197
3198impl fmt::Display for SubcError {
3199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3200 match self {
3201 Self::Runtime(e) => write!(f, "failed to build subc tokio runtime: {e}"),
3202 Self::ConnectionFile { path, source } => {
3203 write!(f, "failed to read subc connection file {path:?}: {source}")
3204 }
3205 Self::NoEndpoint { path } => {
3206 write!(f, "subc connection file {path:?} has no endpoints")
3207 }
3208 Self::InvalidEndpoint { path, endpoint } => {
3209 write!(
3210 f,
3211 "subc connection file {path:?} has invalid endpoint {endpoint}"
3212 )
3213 }
3214 Self::Connect { endpoint, source } => {
3215 write!(f, "failed to connect to subc endpoint {endpoint}: {source}")
3216 }
3217 Self::Auth { endpoint, source } => {
3218 write!(
3219 f,
3220 "failed to authenticate to subc endpoint {endpoint}: {source}"
3221 )
3222 }
3223 Self::FrameIo(e) => write!(f, "subc frame I/O error: {e}"),
3224 Self::FrameBuild(e) => write!(f, "subc frame build error: {e}"),
3225 Self::WriterClosed => write!(f, "subc writer task closed"),
3226 Self::WriterBackpressureTimeout => write!(
3227 f,
3228 "subc writer task stayed backpressured while sending a control frame"
3229 ),
3230 Self::WriterJoin(e) => write!(f, "subc writer task join error: {e}"),
3231 Self::Json(e) => write!(f, "subc JSON error: {e}"),
3232 Self::ClosedBeforeHelloAck => {
3233 write!(f, "subc daemon closed the connection before HelloAck")
3234 }
3235 Self::HelloRejected { body } => match body {
3236 Some(b) => write!(f, "subc rejected ModuleHello: {} ({})", b.code, b.message),
3237 None => write!(f, "subc rejected ModuleHello (unparseable error body)"),
3238 },
3239 Self::UnexpectedFrame { ty } => {
3240 write!(f, "subc sent unexpected frame in place of HelloAck: {ty:?}")
3241 }
3242 }
3243 }
3244}
3245
3246impl std::error::Error for SubcError {}