1use std::collections::{HashMap, HashSet, VecDeque};
15use std::fmt;
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use serde::Deserialize;
22use serde_json::{json, Value};
23
24use crate::config::Config;
25use crate::config_resolve::ConfigTier;
26use crate::context::{App, AppContext, ProgressSender};
27use crate::executor::{Executor, Lane};
28use crate::log_ctx;
29use crate::path_identity::ProjectRootId;
30use crate::protocol::{ProgressKind, PushFrame, RawRequest, Response};
31use crate::runtime_drain;
32
33use subc_protocol::manifest::{
34 Bindings, Concurrency, ConfigBinding, ConfigSource, ExecutionMode, IdentityBinding,
35 IdentityScope, ModuleManifest, ProviderRole, StorageBinding, StorageKind, StorageScope, Tool,
36 TrustTier,
37};
38use subc_protocol::session::{ModuleControlRequest, ModuleControlResponse};
39use subc_protocol::{
40 ErrorBody, Flags, Frame, FrameType, ModuleHelloBody, Priority, PROTOCOL_VERSION,
41};
42use subc_transport::{authenticate_client, connection_file, read_frame, write_frame};
43use tokio::io::{AsyncRead, AsyncWrite};
44use tokio::net::TcpStream;
45use tokio::sync::{mpsc, oneshot, Notify};
46use tokio::task::JoinHandle;
47
48const AUTH_DEADLINE: Duration = Duration::from_secs(5);
52
53const HELLO_CORR: u64 = 1;
55
56const PUSH_BUFFER_MAX_PER_KEY: usize = 256;
59
60const CONTROL_SEND_TIMEOUT: Duration = Duration::from_millis(250);
64
65const COMPLETED_TASK_SUPPRESSION_MAX: usize = 4096;
68
69type RouteChannel = u32;
70type PushEnvelope = (ProjectRootId, PushFrame);
71type RetryBuffer = HashMap<RouteChannel, VecDeque<(ReplayKey, PushFrame)>>;
72
73#[derive(Clone)]
74struct PushSenders {
75 lossy_tx: mpsc::Sender<PushEnvelope>,
76 reliable_tx: mpsc::UnboundedSender<PushEnvelope>,
77}
78
79#[derive(Debug)]
80struct RootMeta {
81 maintenance_pending: bool,
82 last_touched: Instant,
83 diagnostics_on_edit: bool,
84}
85
86#[derive(Debug)]
87struct PendingBind {
88 bind_root_id: ProjectRootId,
89 inserted_new_actor: bool,
90 cancelled: bool,
91}
92
93struct RouteBindCompletion {
94 route_channel: u16,
95 identity: RouteIdentity,
96 bind_root_id: ProjectRootId,
97 inserted_new_actor: bool,
98 configure_response: Response,
99 drain_response: Option<Response>,
100 diagnostics_on_edit: bool,
101 ver: u8,
102 corr: u64,
103 flags: Flags,
104}
105
106#[derive(Debug, Clone)]
107struct RouteIdentity {
108 root: ProjectRootId,
109 project_root: PathBuf,
110 harness: String,
111 session: String,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Hash)]
115struct ReplayKey {
116 root: ProjectRootId,
117 harness: String,
118 session: String,
119}
120
121impl ReplayKey {
122 fn from_identity(identity: &RouteIdentity) -> Self {
123 Self {
124 root: identity.root.clone(),
125 harness: identity.harness.clone(),
126 session: identity.session.clone(),
127 }
128 }
129}
130
131#[derive(Debug, Default)]
132struct CompletedTaskIds {
133 order: VecDeque<String>,
134 set: HashSet<String>,
135}
136
137impl CompletedTaskIds {
138 fn remember(&mut self, task_id: &str) {
139 if self.set.contains(task_id) {
140 return;
141 }
142 if self.order.len() >= COMPLETED_TASK_SUPPRESSION_MAX {
143 if let Some(evicted) = self.order.pop_front() {
144 self.set.remove(&evicted);
145 }
146 }
147 let task_id = task_id.to_string();
148 self.order.push_back(task_id.clone());
149 self.set.insert(task_id);
150 }
151
152 fn contains(&self, task_id: &str) -> bool {
153 self.set.contains(task_id)
154 }
155}
156
157impl RootMeta {
158 fn new(now: Instant) -> Self {
159 Self {
160 maintenance_pending: false,
161 last_touched: now,
162 diagnostics_on_edit: false,
163 }
164 }
165
166 fn touch(&mut self) {
167 self.last_touched = Instant::now();
168 }
169}
170
171fn route_key(channel: u16) -> RouteChannel {
172 RouteChannel::from(channel)
173}
174
175fn remove_root_channel(
176 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
177 root: &ProjectRootId,
178 channel: RouteChannel,
179) {
180 let remove_root = if let Some(channels) = root_channels.get_mut(root) {
181 channels.remove(&channel);
182 channels.is_empty()
183 } else {
184 false
185 };
186 if remove_root {
187 root_channels.remove(root);
188 }
189}
190
191fn remove_route_channel(
192 routes: &mut HashMap<RouteChannel, RouteIdentity>,
193 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
194 channel: RouteChannel,
195) -> Option<RouteIdentity> {
196 let removed = routes.remove(&channel);
197 if let Some(identity) = &removed {
198 remove_root_channel(root_channels, &identity.root, channel);
199 }
200 removed
201}
202
203fn insert_route_channel(
204 routes: &mut HashMap<RouteChannel, RouteIdentity>,
205 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
206 channel: RouteChannel,
207 identity: RouteIdentity,
208) {
209 if let Some(previous) = routes.insert(channel, identity.clone()) {
210 remove_root_channel(root_channels, &previous.root, channel);
211 }
212 root_channels
213 .entry(identity.root.clone())
214 .or_default()
215 .insert(channel);
216}
217
218fn remember_session_identity(
219 session_identity: &mut HashMap<(ProjectRootId, String), String>,
220 identity: &RouteIdentity,
221) {
222 session_identity.insert(
227 (identity.root.clone(), identity.session.clone()),
228 identity.harness.clone(),
229 );
230}
231
232fn replay_key_for_session(
233 session_identity: &HashMap<(ProjectRootId, String), String>,
234 root: &ProjectRootId,
235 session: &str,
236) -> Option<ReplayKey> {
237 let harness = session_identity.get(&(root.clone(), session.to_string()))?;
238 Some(ReplayKey {
239 root: root.clone(),
240 harness: harness.clone(),
241 session: session.to_string(),
242 })
243}
244
245fn frame_session(frame: &PushFrame) -> Option<&str> {
246 match frame {
247 PushFrame::BashCompleted(completed) => Some(completed.session_id.as_str()),
248 PushFrame::BashLongRunning(long_running) => Some(long_running.session_id.as_str()),
249 PushFrame::BashPatternMatch(pattern_match) => Some(pattern_match.session_id.as_str()),
250 PushFrame::ConfigureWarnings(warnings) => warnings.session_id.as_deref(),
251 PushFrame::StatusChanged(status) => status.session_id.as_deref(),
252 PushFrame::Progress(_) => None,
253 }
254}
255
256fn frame_is_reliable(frame: &PushFrame) -> bool {
257 matches!(
258 frame,
259 PushFrame::BashCompleted(_)
260 | PushFrame::BashPatternMatch(_)
261 | PushFrame::ConfigureWarnings(_)
262 )
263}
264
265fn completed_task_id(frame: &PushFrame) -> Option<&str> {
266 match frame {
267 PushFrame::BashCompleted(completed) => Some(completed.task_id.as_str()),
268 _ => None,
269 }
270}
271
272fn long_running_task_id(frame: &PushFrame) -> Option<&str> {
273 match frame {
274 PushFrame::BashLongRunning(long_running) => Some(long_running.task_id.as_str()),
275 _ => None,
276 }
277}
278
279fn should_drop_lossy_push(completed_tasks: &CompletedTaskIds, frame: &PushFrame) -> bool {
280 long_running_task_id(frame).is_some_and(|task_id| completed_tasks.contains(task_id))
281}
282
283fn progress_sender_for_root(push_senders: PushSenders, root_id: ProjectRootId) -> ProgressSender {
284 Arc::new(Box::new(move |frame: PushFrame| {
285 if frame_is_reliable(&frame) {
290 let _ = push_senders.reliable_tx.send((root_id.clone(), frame));
291 } else {
292 let _ = push_senders.lossy_tx.try_send((root_id.clone(), frame));
293 }
294 }))
295}
296
297#[derive(Debug, Clone, PartialEq, Eq, Hash)]
298enum LossyProgressKind {
299 Stdout,
300 Stderr,
301}
302
303impl From<&ProgressKind> for LossyProgressKind {
304 fn from(kind: &ProgressKind) -> Self {
305 match kind {
306 ProgressKind::Stdout => Self::Stdout,
307 ProgressKind::Stderr => Self::Stderr,
308 }
309 }
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313enum LossyPushKey {
314 Progress {
315 request_id: String,
316 kind: LossyProgressKind,
317 },
318 StatusChanged,
319 BashLongRunning {
320 task_id: String,
321 },
322}
323
324fn lossy_push_key(frame: &PushFrame) -> Option<LossyPushKey> {
325 match frame {
326 PushFrame::Progress(progress) => Some(LossyPushKey::Progress {
327 request_id: progress.request_id.clone(),
328 kind: LossyProgressKind::from(&progress.kind),
329 }),
330 PushFrame::StatusChanged(_) => Some(LossyPushKey::StatusChanged),
331 PushFrame::BashLongRunning(long_running) => Some(LossyPushKey::BashLongRunning {
332 task_id: long_running.task_id.clone(),
333 }),
334 PushFrame::BashCompleted(_)
335 | PushFrame::BashPatternMatch(_)
336 | PushFrame::ConfigureWarnings(_) => None,
337 }
338}
339
340fn coalesce_push_batch(batch: Vec<(ProjectRootId, PushFrame)>) -> Vec<(ProjectRootId, PushFrame)> {
341 let mut slots: Vec<Option<(ProjectRootId, PushFrame)>> = Vec::with_capacity(batch.len());
342 let mut latest_lossy: HashMap<(ProjectRootId, LossyPushKey), usize> = HashMap::new();
343
344 for (root, frame) in batch {
345 if let Some(lossy_key) = lossy_push_key(&frame) {
346 let map_key = (root.clone(), lossy_key);
347 if let Some(previous_index) = latest_lossy.insert(map_key, slots.len()) {
348 slots[previous_index] = None;
349 }
350 }
351 slots.push(Some((root, frame)));
352 }
353
354 slots.into_iter().flatten().collect()
355}
356
357#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
358struct FanOutResult {
359 matched_channels: usize,
363 sent_frames: usize,
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum PushSendOutcome {
370 Sent,
371 Backpressure,
372 PermanentFailure,
373}
374
375fn try_send_push_body(
376 writer_tx: &mpsc::Sender<Frame>,
377 channel: RouteChannel,
378 body: &[u8],
379) -> PushSendOutcome {
380 let Ok(route_channel) = u16::try_from(channel) else {
381 log::warn!("subc attach: invalid route channel {channel} for Push fan-out");
382 return PushSendOutcome::PermanentFailure;
383 };
384 let push_frame = match Frame::build_with_version(
385 PROTOCOL_VERSION,
386 FrameType::Push,
387 control_flags(),
388 route_channel,
389 0,
390 body.to_vec(),
391 ) {
392 Ok(frame) => frame,
393 Err(error) => {
394 log::warn!("subc attach: failed to build Push frame: {error}");
395 return PushSendOutcome::PermanentFailure;
396 }
397 };
398 match writer_tx.try_send(push_frame) {
399 Ok(()) => PushSendOutcome::Sent,
400 Err(mpsc::error::TrySendError::Full(_)) => PushSendOutcome::Backpressure,
401 Err(mpsc::error::TrySendError::Closed(_)) => {
402 log::warn!("subc attach: writer closed while sending Push frame");
403 PushSendOutcome::PermanentFailure
404 }
405 }
406}
407
408fn try_send_push_frame(
409 writer_tx: &mpsc::Sender<Frame>,
410 channel: RouteChannel,
411 frame: &PushFrame,
412) -> PushSendOutcome {
413 let body = match serde_json::to_vec(frame) {
414 Ok(body) => body,
415 Err(error) => {
416 log::warn!("subc attach: failed to serialize PushFrame: {error}");
417 return PushSendOutcome::PermanentFailure;
418 }
419 };
420 try_send_push_body(writer_tx, channel, &body)
421}
422
423fn bounded_push_back<T>(queue: &mut VecDeque<T>, item: T) {
424 if queue.len() >= PUSH_BUFFER_MAX_PER_KEY {
425 queue.pop_front();
426 }
427 queue.push_back(item);
428}
429
430fn buffer_push_frame(
431 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
432 key: ReplayKey,
433 frame: PushFrame,
434) {
435 bounded_push_back(push_buffer.entry(key).or_default(), frame);
436}
437
438fn buffer_retry_frame(
439 retry_buffer: &mut RetryBuffer,
440 channel: RouteChannel,
441 key: ReplayKey,
442 frame: PushFrame,
443) {
444 bounded_push_back(retry_buffer.entry(channel).or_default(), (key, frame));
445}
446
447fn migrate_retry_buffer_to_push_buffer(
448 retry_buffer: &mut RetryBuffer,
449 channel: RouteChannel,
450 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
451) -> usize {
452 let Some(frames) = retry_buffer.remove(&channel) else {
453 return 0;
454 };
455 let migrated = frames.len();
456 for (key, frame) in frames {
457 buffer_push_frame(push_buffer, key, frame);
458 }
459 migrated
460}
461
462fn replay_buffered_push_frames(
463 writer_tx: &mpsc::Sender<Frame>,
464 channel: RouteChannel,
465 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
466 key: &ReplayKey,
467) -> usize {
468 let mut sent = 0;
469 let remove_empty;
470
471 {
472 let Some(queue) = push_buffer.get_mut(key) else {
473 return 0;
474 };
475
476 while let Some(frame) = queue.pop_front() {
477 match try_send_push_frame(writer_tx, channel, &frame) {
478 PushSendOutcome::Sent => sent += 1,
479 PushSendOutcome::Backpressure => {
480 queue.push_front(frame);
481 break;
482 }
483 PushSendOutcome::PermanentFailure => {
484 log::warn!(
485 "subc attach: dropping buffered reliable Push for root {} harness {} session {} after permanent send failure",
486 key.root.as_path().display(),
487 key.harness,
488 key.session
489 );
490 }
491 }
492 }
493
494 remove_empty = queue.is_empty();
495 }
496
497 if remove_empty {
498 push_buffer.remove(key);
499 }
500
501 sent
502}
503
504fn drain_retry_buffer_for_channel(
505 writer_tx: &mpsc::Sender<Frame>,
506 channel: RouteChannel,
507 retry_buffer: &mut RetryBuffer,
508) -> usize {
509 let mut sent = 0;
510 let remove_empty;
511
512 {
513 let Some(queue) = retry_buffer.get_mut(&channel) else {
514 return 0;
515 };
516
517 while let Some((key, frame)) = queue.pop_front() {
518 match try_send_push_frame(writer_tx, channel, &frame) {
519 PushSendOutcome::Sent => sent += 1,
520 PushSendOutcome::Backpressure => {
521 queue.push_front((key, frame));
522 break;
523 }
524 PushSendOutcome::PermanentFailure => {
525 log::warn!(
526 "subc attach: dropping retry-buffered reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
527 key.root.as_path().display(),
528 key.harness,
529 key.session
530 );
531 }
532 }
533 }
534
535 remove_empty = queue.is_empty();
536 }
537
538 if remove_empty {
539 retry_buffer.remove(&channel);
540 }
541
542 sent
543}
544
545fn drain_retry_buffers_for_bound_routes(
546 writer_tx: &mpsc::Sender<Frame>,
547 routes: &HashMap<RouteChannel, RouteIdentity>,
548 retry_buffer: &mut RetryBuffer,
549) -> usize {
550 let channels: Vec<RouteChannel> = routes.keys().copied().collect();
551 channels
552 .into_iter()
553 .map(|channel| drain_retry_buffer_for_channel(writer_tx, channel, retry_buffer))
554 .sum()
555}
556
557fn matching_route_channels(
558 routes: &HashMap<RouteChannel, RouteIdentity>,
559 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
560 root: &ProjectRootId,
561 frame: &PushFrame,
562) -> Vec<RouteChannel> {
563 let Some(channels) = root_channels.get(root) else {
564 return Vec::new();
565 };
566
567 let session = frame_session(frame);
568 channels
569 .iter()
570 .copied()
571 .filter(|channel| match session {
572 Some(session) => routes
573 .get(channel)
574 .is_some_and(|identity| identity.session == session),
575 None => true,
576 })
577 .collect()
578}
579
580fn buffer_detached_reliable_push_frame(
581 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
582 session_identity: &HashMap<(ProjectRootId, String), String>,
583 root: &ProjectRootId,
584 frame: &PushFrame,
585) {
586 let Some(session) = frame_session(frame) else {
587 log::warn!(
588 "subc attach: dropping reliable project-scoped Push for root {} because no route is bound",
589 root.as_path().display()
590 );
591 return;
592 };
593
594 if let Some(key) = replay_key_for_session(session_identity, root, session) {
595 buffer_push_frame(push_buffer, key, frame.clone());
596 } else {
597 log::warn!(
598 "subc attach: dropping reliable Push for root {} session {} because no retained harness identity is known",
599 root.as_path().display(),
600 session
601 );
602 }
603}
604
605fn fan_out_lossy_push_frame(
606 writer_tx: &mpsc::Sender<Frame>,
607 routes: &HashMap<RouteChannel, RouteIdentity>,
608 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
609 root: &ProjectRootId,
610 frame: &PushFrame,
611) -> FanOutResult {
612 let matching_channels = matching_route_channels(routes, root_channels, root, frame);
613 let matched_channels = matching_channels.len();
614 if matched_channels == 0 {
615 return FanOutResult::default();
616 }
617
618 let body = match serde_json::to_vec(frame) {
619 Ok(body) => body,
620 Err(error) => {
621 log::warn!("subc attach: failed to serialize PushFrame for fan-out: {error}");
622 return FanOutResult {
623 matched_channels,
624 sent_frames: 0,
625 };
626 }
627 };
628
629 let sent_frames = matching_channels
630 .into_iter()
631 .filter(|&channel| {
632 matches!(
633 try_send_push_body(writer_tx, channel, &body),
634 PushSendOutcome::Sent
635 )
636 })
637 .count();
638
639 FanOutResult {
640 matched_channels,
641 sent_frames,
642 }
643}
644
645fn fan_out_reliable_push_frame(
646 writer_tx: &mpsc::Sender<Frame>,
647 routes: &HashMap<RouteChannel, RouteIdentity>,
648 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
649 session_identity: &HashMap<(ProjectRootId, String), String>,
650 retry_buffer: &mut RetryBuffer,
651 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
652 root: &ProjectRootId,
653 frame: &PushFrame,
654) -> FanOutResult {
655 let matching_channels = matching_route_channels(routes, root_channels, root, frame);
656 let matched_channels = matching_channels.len();
657 if matched_channels == 0 {
658 buffer_detached_reliable_push_frame(push_buffer, session_identity, root, frame);
659 return FanOutResult::default();
660 }
661
662 let mut sent_frames = 0;
663 for channel in matching_channels {
664 let Some(identity) = routes.get(&channel) else {
665 log::warn!(
666 "subc attach: dropping reliable Push for stale route channel {channel} with no route identity"
667 );
668 continue;
669 };
670 let key = ReplayKey::from_identity(identity);
671
672 if retry_buffer
673 .get(&channel)
674 .is_some_and(|queue| !queue.is_empty())
675 {
676 buffer_retry_frame(retry_buffer, channel, key, frame.clone());
677 continue;
678 }
679
680 match try_send_push_frame(writer_tx, channel, frame) {
681 PushSendOutcome::Sent => sent_frames += 1,
682 PushSendOutcome::Backpressure => {
683 buffer_retry_frame(retry_buffer, channel, key, frame.clone());
684 }
685 PushSendOutcome::PermanentFailure => {
686 log::warn!(
687 "subc attach: dropping reliable Push for route {channel} root {} harness {} session {} after permanent send failure",
688 key.root.as_path().display(),
689 key.harness,
690 key.session
691 );
692 }
693 }
694 }
695
696 FanOutResult {
697 matched_channels,
698 sent_frames,
699 }
700}
701
702fn process_reliable_push_frame(
703 writer_tx: &mpsc::Sender<Frame>,
704 routes: &HashMap<RouteChannel, RouteIdentity>,
705 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
706 session_identity: &HashMap<(ProjectRootId, String), String>,
707 retry_buffer: &mut RetryBuffer,
708 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
709 completed_tasks: &mut CompletedTaskIds,
710 root: ProjectRootId,
711 frame: PushFrame,
712) {
713 if let Some(task_id) = completed_task_id(&frame) {
714 completed_tasks.remember(task_id);
715 }
716 let _ = fan_out_reliable_push_frame(
717 writer_tx,
718 routes,
719 root_channels,
720 session_identity,
721 retry_buffer,
722 push_buffer,
723 &root,
724 &frame,
725 );
726}
727
728fn process_lossy_push_frame(
729 writer_tx: &mpsc::Sender<Frame>,
730 routes: &HashMap<RouteChannel, RouteIdentity>,
731 root_channels: &HashMap<ProjectRootId, HashSet<RouteChannel>>,
732 completed_tasks: &CompletedTaskIds,
733 root: ProjectRootId,
734 frame: PushFrame,
735) {
736 if should_drop_lossy_push(completed_tasks, &frame) {
737 if let Some(task_id) = long_running_task_id(&frame) {
738 log::debug!(
739 "subc attach: dropping stale BashLongRunning Push for completed task {task_id}"
740 );
741 }
742 return;
743 }
744
745 let _ = fan_out_lossy_push_frame(writer_tx, routes, root_channels, &root, &frame);
746}
747
748pub type DispatchFn = fn(RawRequest, &AppContext) -> Response;
751
752pub fn run_subc_mode(
757 connection_file_path: &Path,
758 ctx: Arc<AppContext>,
759 executor: Arc<Executor>,
760 dispatch: DispatchFn,
761 user_config_path: Option<PathBuf>,
762) -> Result<(), SubcError> {
763 run_subc_mode_inner(
767 connection_file_path,
768 ctx,
769 executor,
770 dispatch,
771 user_config_path,
772 false,
773 )
774}
775
776fn run_subc_mode_inner(
777 connection_file_path: &Path,
778 ctx: Arc<AppContext>,
779 executor: Arc<Executor>,
780 dispatch: DispatchFn,
781 user_config_path: Option<PathBuf>,
782 allow_native_passthrough: bool,
783) -> Result<(), SubcError> {
784 let runtime = tokio::runtime::Builder::new_current_thread()
785 .enable_all()
786 .build()
787 .map_err(SubcError::Runtime)?;
788
789 let executor_for_loop = Arc::clone(&executor);
790 let loop_result = runtime.block_on(async move {
791 let shared_app = ctx.app();
792 drop(ctx);
793 let stream = connect_and_authenticate(connection_file_path).await?;
794 log::info!(
795 "subc attach: authenticated to daemon via {}",
796 connection_file_path.display()
797 );
798 let (read_half, write_half) = tokio::io::split(stream);
799 run_module_loop(
800 read_half,
801 write_half,
802 shared_app,
803 executor_for_loop,
804 dispatch,
805 user_config_path,
806 allow_native_passthrough,
807 )
808 .await
809 });
810
811 for actor_ctx in executor.actor_contexts() {
812 actor_ctx.lsp().shutdown_all();
813 actor_ctx.bash_background().detach();
814 }
815
816 loop_result
817}
818
819#[doc(hidden)]
824pub fn run_subc_mode_for_test(
825 connection_file_path: &Path,
826 ctx: Arc<AppContext>,
827 executor: Arc<Executor>,
828 dispatch: DispatchFn,
829 user_config_path: Option<PathBuf>,
830) -> Result<(), SubcError> {
831 run_subc_mode_inner(
832 connection_file_path,
833 ctx,
834 executor,
835 dispatch,
836 user_config_path,
837 true,
838 )
839}
840
841async fn connect_and_authenticate(connection_file_path: &Path) -> Result<TcpStream, SubcError> {
844 let conn = connection_file::read(connection_file_path).map_err(|source| {
845 SubcError::ConnectionFile {
846 path: connection_file_path.to_path_buf(),
847 source,
848 }
849 })?;
850
851 let endpoint = conn
852 .endpoints
853 .first()
854 .ok_or_else(|| SubcError::NoEndpoint {
855 path: connection_file_path.to_path_buf(),
856 })?;
857 let endpoint_label = format!("{}:{}", endpoint.host, endpoint.port);
858 let ip = endpoint
859 .host
860 .parse::<IpAddr>()
861 .map_err(|_| SubcError::InvalidEndpoint {
862 path: connection_file_path.to_path_buf(),
863 endpoint: endpoint_label.clone(),
864 })?;
865 let addr = SocketAddr::new(ip, endpoint.port);
866
867 let mut stream = TcpStream::connect(addr)
868 .await
869 .map_err(|source| SubcError::Connect {
870 endpoint: endpoint_label.clone(),
871 source,
872 })?;
873
874 authenticate_client(&mut stream, &conn, AUTH_DEADLINE)
875 .await
876 .map_err(|source| SubcError::Auth {
877 endpoint: endpoint_label,
878 source,
879 })?;
880
881 Ok(stream)
882}
883
884async fn run_module_loop<R, W>(
888 mut read: R,
889 mut write: W,
890 shared_app: Arc<App>,
891 executor: Arc<Executor>,
892 dispatch: DispatchFn,
893 user_config_path: Option<PathBuf>,
894 allow_native_passthrough: bool,
895) -> Result<(), SubcError>
896where
897 R: AsyncRead + Unpin + Send + 'static,
898 W: AsyncWrite + Unpin + Send + 'static,
899{
900 let hello = ModuleHelloBody {
902 manifest: build_manifest(),
903 protocol_ver: PROTOCOL_VERSION,
904 control_ops: None,
905 };
906 let hello_frame = Frame::build(
907 FrameType::Hello,
908 control_flags(),
909 0,
910 HELLO_CORR,
911 serde_json::to_vec(&hello).map_err(SubcError::Json)?,
912 )
913 .map_err(SubcError::FrameBuild)?;
914 write_frame(&mut write, &hello_frame)
915 .await
916 .map_err(SubcError::FrameIo)?;
917
918 match read_frame(&mut read).await.map_err(SubcError::FrameIo)? {
920 None => return Err(SubcError::ClosedBeforeHelloAck),
921 Some(frame) => match frame.header.ty {
922 FrameType::HelloAck => {
923 log::info!("subc attach: registered (HelloAck received)");
924 }
925 FrameType::Error => {
926 let body = serde_json::from_slice::<ErrorBody>(&frame.body).ok();
927 return Err(SubcError::HelloRejected { body });
928 }
929 other => return Err(SubcError::UnexpectedFrame { ty: other }),
930 },
931 }
932
933 let (writer_tx, writer_rx) = mpsc::channel::<Frame>(256);
934 let writer_task = spawn_writer_task(write, writer_rx);
935 let (reader_tx, mut reader_rx) = mpsc::channel::<Result<Frame, SubcError>>(256);
942 let reader_task = spawn_reader_task(read, reader_tx);
943 let shutdown = Arc::new(Notify::new());
944 let mut drain_interval = tokio::time::interval(Duration::from_millis(250));
945 let (maintenance_tx, mut maintenance_rx) = mpsc::channel::<(ProjectRootId, Response)>(256);
946 let (control_completion_tx, mut control_completion_rx) =
947 mpsc::channel::<RouteBindCompletion>(256);
948 let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1024);
949 let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
950 let push_senders = PushSenders {
951 lossy_tx,
952 reliable_tx,
953 };
954 let mut routes: HashMap<RouteChannel, RouteIdentity> = HashMap::new();
955 let mut root_channels: HashMap<ProjectRootId, HashSet<RouteChannel>> = HashMap::new();
956 let mut session_identity: HashMap<(ProjectRootId, String), String> = HashMap::new();
957 let mut push_buffer: HashMap<ReplayKey, VecDeque<PushFrame>> = HashMap::new();
958 let mut retry_buffer: RetryBuffer = HashMap::new();
959 let mut completed_tasks = CompletedTaskIds::default();
960 let mut live_roots: HashMap<ProjectRootId, RootMeta> = HashMap::new();
961 let mut pending_binds: HashMap<RouteChannel, PendingBind> = HashMap::new();
962
963 let loop_result: Result<(), SubcError> = loop {
964 tokio::select! {
965 _ = shutdown.notified() => {
966 log::warn!("subc attach: fatal executor response requested teardown");
967 break Ok(());
968 }
969 maybe_frame = reader_rx.recv() => {
970 let frame = match maybe_frame {
971 None => {
972 log::info!("subc attach: daemon closed connection");
973 break Ok(());
974 }
975 Some(Err(error)) => break Err(error),
976 Some(Ok(frame)) => frame,
977 };
978
979 match frame.header.ty {
980 FrameType::Ping if frame.header.channel == 0 => {
981 let pong = match Frame::build_with_version(
982 frame.header.ver,
983 FrameType::Pong,
984 frame.header.flags,
985 0,
986 frame.header.corr,
987 Vec::new(),
988 ) {
989 Ok(pong) => pong,
990 Err(error) => break Err(SubcError::FrameBuild(error)),
991 };
992 if let Err(error) = send_frame(&writer_tx, pong).await {
993 break Err(error);
994 }
995 }
996 FrameType::Goodbye if frame.header.channel == 0 => {
997 log::info!("subc attach: received channel-0 Goodbye");
998 break Ok(());
999 }
1000 FrameType::Goodbye => {
1001 let channel = route_key(frame.header.channel);
1002 if let Some(pending) = pending_binds.get_mut(&channel) {
1003 pending.cancelled = true;
1004 log::debug!(
1005 "subc attach: cancelled pending RouteBind for route {} on Goodbye",
1006 frame.header.channel
1007 );
1008 }
1009 let migrated = migrate_retry_buffer_to_push_buffer(
1010 &mut retry_buffer,
1011 channel,
1012 &mut push_buffer,
1013 );
1014 if let Some(identity) = remove_route_channel(&mut routes, &mut root_channels, channel) {
1015 if migrated > 0 {
1016 log::debug!(
1017 "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from route {} into detach replay",
1018 frame.header.channel
1019 );
1020 }
1021 if let Some(meta) = live_roots.get_mut(&identity.root) {
1022 let idle_for = meta.last_touched.elapsed();
1023 meta.touch();
1024 log::debug!(
1025 "subc attach: route {} torn down for root {} harness {} session {} (last touched {:?} ago)",
1026 frame.header.channel,
1027 identity.root.as_path().display(),
1028 identity.harness,
1029 identity.session,
1030 idle_for
1031 );
1032 } else {
1033 log::debug!(
1034 "subc attach: route {} torn down for root {} harness {} session {}",
1035 frame.header.channel,
1036 identity.root.as_path().display(),
1037 identity.harness,
1038 identity.session
1039 );
1040 }
1041 } else {
1042 if migrated > 0 {
1043 log::debug!(
1044 "subc attach: migrated {migrated} retry-buffered reliable Push frame(s) from unbound route {} into detach replay",
1045 frame.header.channel
1046 );
1047 }
1048 log::debug!("subc attach: unbound route {} torn down", frame.header.channel);
1049 }
1050 }
1051 FrameType::Request if frame.header.channel == 0 => {
1052 if let Err(error) = handle_control_request(
1053 &writer_tx,
1054 &frame,
1055 &shared_app,
1056 &executor,
1057 &mut live_roots,
1058 &mut pending_binds,
1059 &control_completion_tx,
1060 &push_senders,
1061 dispatch,
1062 user_config_path.as_deref(),
1063 )
1064 .await
1065 {
1066 break Err(error);
1067 }
1068 }
1069 FrameType::Request => {
1070 if let Err(error) = handle_tool_call(
1071 &writer_tx,
1072 &frame,
1073 &routes,
1074 &pending_binds,
1075 &mut live_roots,
1076 &executor,
1077 &shutdown,
1078 dispatch,
1079 allow_native_passthrough,
1080 )
1081 .await
1082 {
1083 break Err(error);
1084 }
1085 }
1086 _ => {}
1090 }
1091 }
1092 Some((root_id, frame)) = reliable_rx.recv() => {
1093 let mut batch = vec![(root_id, frame)];
1097 while let Ok(item) = reliable_rx.try_recv() {
1098 batch.push(item);
1099 }
1100
1101 for (root, frame) in batch {
1102 process_reliable_push_frame(
1103 &writer_tx,
1104 &routes,
1105 &root_channels,
1106 &session_identity,
1107 &mut retry_buffer,
1108 &mut push_buffer,
1109 &mut completed_tasks,
1110 root,
1111 frame,
1112 );
1113 }
1114 }
1115 Some((root_id, frame)) = lossy_rx.recv() => {
1116 while let Ok((reliable_root, reliable_frame)) = reliable_rx.try_recv() {
1120 process_reliable_push_frame(
1121 &writer_tx,
1122 &routes,
1123 &root_channels,
1124 &session_identity,
1125 &mut retry_buffer,
1126 &mut push_buffer,
1127 &mut completed_tasks,
1128 reliable_root,
1129 reliable_frame,
1130 );
1131 }
1132
1133 let mut batch = vec![(root_id, frame)];
1137 while let Ok(item) = lossy_rx.try_recv() {
1138 batch.push(item);
1139 }
1140
1141 for (root, frame) in coalesce_push_batch(batch) {
1142 process_lossy_push_frame(
1143 &writer_tx,
1144 &routes,
1145 &root_channels,
1146 &completed_tasks,
1147 root,
1148 frame,
1149 );
1150 }
1151 }
1152 Some(completion) = control_completion_rx.recv() => {
1153 if let Err(error) = handle_route_bind_completion(
1154 &writer_tx,
1155 completion,
1156 &mut routes,
1157 &mut root_channels,
1158 &mut session_identity,
1159 &mut push_buffer,
1160 &mut live_roots,
1161 &mut pending_binds,
1162 &executor,
1163 &shutdown,
1164 )
1165 .await
1166 {
1167 break Err(error);
1168 }
1169 }
1170 Some((root_id, response)) = maintenance_rx.recv() => {
1171 if let Some(meta) = live_roots.get_mut(&root_id) {
1172 meta.maintenance_pending = false;
1173 }
1174 if response_is_internal_error(&response) {
1175 signal_fatal_teardown(&writer_tx, None, PROTOCOL_VERSION, 0, &shutdown).await;
1176 }
1177 }
1178 _ = drain_interval.tick() => {
1179 let retried = drain_retry_buffers_for_bound_routes(
1180 &writer_tx,
1181 &routes,
1182 &mut retry_buffer,
1183 );
1184 if retried > 0 {
1185 log::debug!(
1186 "subc attach: retried {retried} reliable Push frame(s) after writer backpressure"
1187 );
1188 }
1189
1190 let due_roots: Vec<ProjectRootId> = live_roots
1191 .iter_mut()
1192 .filter_map(|(root_id, meta)| {
1193 if meta.maintenance_pending {
1194 None
1195 } else {
1196 meta.maintenance_pending = true;
1197 Some(root_id.clone())
1198 }
1199 })
1200 .collect();
1201 for root_id in due_roots {
1202 submit_maintenance_drain(&executor, root_id, &maintenance_tx);
1203 }
1204 }
1205 }
1206 };
1207
1208 reader_task.abort();
1211 drop(writer_tx);
1212 let writer_result = finish_writer_task(writer_task).await;
1213 loop_result.and(writer_result)
1214}
1215
1216fn spawn_writer_task<W>(
1217 mut write: W,
1218 mut rx: mpsc::Receiver<Frame>,
1219) -> JoinHandle<Result<(), subc_transport::FrameIoError>>
1220where
1221 W: AsyncWrite + Unpin + Send + 'static,
1222{
1223 tokio::spawn(async move {
1224 while let Some(frame) = rx.recv().await {
1225 write_frame(&mut write, &frame).await?;
1226 }
1227 Ok(())
1228 })
1229}
1230
1231fn spawn_reader_task<R>(mut read: R, tx: mpsc::Sender<Result<Frame, SubcError>>) -> JoinHandle<()>
1238where
1239 R: AsyncRead + Unpin + Send + 'static,
1240{
1241 tokio::spawn(async move {
1242 loop {
1243 match read_frame(&mut read).await {
1244 Ok(Some(frame)) => {
1245 if tx.send(Ok(frame)).await.is_err() {
1246 return;
1247 }
1248 }
1249 Ok(None) => {
1250 return;
1252 }
1253 Err(error) => {
1254 let _ = tx.send(Err(SubcError::FrameIo(error))).await;
1255 return;
1256 }
1257 }
1258 }
1259 })
1260}
1261
1262async fn finish_writer_task(
1263 mut writer_task: JoinHandle<Result<(), subc_transport::FrameIoError>>,
1264) -> Result<(), SubcError> {
1265 match tokio::time::timeout(Duration::from_millis(100), &mut writer_task).await {
1266 Ok(Ok(Ok(()))) => Ok(()),
1267 Ok(Ok(Err(error))) => Err(SubcError::FrameIo(error)),
1268 Ok(Err(error)) => Err(SubcError::WriterJoin(error)),
1269 Err(_) => {
1270 writer_task.abort();
1271 Ok(())
1272 }
1273 }
1274}
1275
1276async fn send_frame(tx: &mpsc::Sender<Frame>, frame: Frame) -> Result<(), SubcError> {
1277 match tokio::time::timeout(CONTROL_SEND_TIMEOUT, tx.send(frame)).await {
1278 Ok(Ok(())) => Ok(()),
1279 Ok(Err(_)) => Err(SubcError::WriterClosed),
1280 Err(_) => Err(SubcError::WriterBackpressureTimeout),
1281 }
1282}
1283
1284fn rollback_pending_bind_actor(
1285 executor: &Arc<Executor>,
1286 live_roots: &HashMap<ProjectRootId, RootMeta>,
1287 root_id: &ProjectRootId,
1288 inserted_new_actor: bool,
1289) {
1290 if inserted_new_actor && !live_roots.contains_key(root_id) {
1291 executor.remove_actor(root_id);
1292 }
1293}
1294
1295async fn handle_route_bind_completion(
1296 tx: &mpsc::Sender<Frame>,
1297 completion: RouteBindCompletion,
1298 routes: &mut HashMap<RouteChannel, RouteIdentity>,
1299 root_channels: &mut HashMap<ProjectRootId, HashSet<RouteChannel>>,
1300 session_identity: &mut HashMap<(ProjectRootId, String), String>,
1301 push_buffer: &mut HashMap<ReplayKey, VecDeque<PushFrame>>,
1302 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1303 pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1304 executor: &Arc<Executor>,
1305 shutdown: &Arc<Notify>,
1306) -> Result<(), SubcError> {
1307 let route_id = route_key(completion.route_channel);
1308 let Some(pending) = pending_binds.remove(&route_id) else {
1309 log::warn!(
1310 "subc attach: dropping RouteBind completion for non-pending route {}",
1311 completion.route_channel
1312 );
1313 rollback_pending_bind_actor(
1314 executor,
1315 live_roots,
1316 &completion.bind_root_id,
1317 completion.inserted_new_actor,
1318 );
1319 return Ok(());
1320 };
1321
1322 if pending.bind_root_id != completion.bind_root_id {
1323 log::warn!(
1324 "subc attach: pending RouteBind root mismatch for route {} (pending {} completion {})",
1325 completion.route_channel,
1326 pending.bind_root_id.as_path().display(),
1327 completion.bind_root_id.as_path().display()
1328 );
1329 }
1330
1331 let inserted_new_actor = pending.inserted_new_actor || completion.inserted_new_actor;
1332 if pending.cancelled {
1333 rollback_pending_bind_actor(
1334 executor,
1335 live_roots,
1336 &completion.bind_root_id,
1337 inserted_new_actor,
1338 );
1339 log::debug!(
1340 "subc attach: discarded completed RouteBind for cancelled route {} root {}",
1341 completion.route_channel,
1342 completion.bind_root_id.as_path().display()
1343 );
1344 return Ok(());
1345 }
1346
1347 let failure = if !completion.configure_response.success {
1348 Some((
1349 &completion.configure_response,
1350 "configure failed during route bind",
1351 ))
1352 } else if let Some(drain_response) = completion.drain_response.as_ref() {
1353 if drain_response.success {
1354 None
1355 } else {
1356 Some((
1357 drain_response,
1358 "build-completion drain failed during route bind",
1359 ))
1360 }
1361 } else {
1362 None
1363 };
1364
1365 if let Some((response, fallback)) = failure {
1366 rollback_pending_bind_actor(
1367 executor,
1368 live_roots,
1369 &completion.bind_root_id,
1370 inserted_new_actor,
1371 );
1372 let message = response_message(response, fallback);
1373 let fatal = response_is_internal_error(response);
1374 send_route_bind_error_parts(
1375 tx,
1376 completion.ver,
1377 completion.corr,
1378 completion.flags,
1379 "config_divergence",
1380 &message,
1381 )
1382 .await?;
1383 if fatal {
1384 signal_fatal_teardown(
1385 tx,
1386 Some(completion.route_channel),
1387 completion.ver,
1388 completion.corr,
1389 shutdown,
1390 )
1391 .await;
1392 }
1393 return Ok(());
1394 }
1395
1396 remember_session_identity(session_identity, &completion.identity);
1397 let replay_key = ReplayKey::from_identity(&completion.identity);
1398 insert_route_channel(routes, root_channels, route_id, completion.identity);
1399 live_roots
1400 .entry(completion.bind_root_id.clone())
1401 .and_modify(|meta| {
1402 meta.touch();
1403 meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1404 })
1405 .or_insert_with(|| RootMeta::new(Instant::now()));
1406 if let Some(meta) = live_roots.get_mut(&completion.bind_root_id) {
1407 meta.diagnostics_on_edit = completion.diagnostics_on_edit;
1408 }
1409
1410 let ack =
1411 serde_json::to_vec(&ModuleControlResponse::RouteBindAck {}).map_err(SubcError::Json)?;
1412 let response = Frame::build_with_version(
1413 completion.ver,
1414 FrameType::Response,
1415 control_flags(),
1416 0,
1417 completion.corr,
1418 ack,
1419 )
1420 .map_err(SubcError::FrameBuild)?;
1421 send_frame(tx, response).await?;
1422 let replayed = replay_buffered_push_frames(tx, route_id, push_buffer, &replay_key);
1423 if replayed > 0 {
1424 log::debug!(
1425 "subc attach: replayed {} buffered Push frame(s) to route {} root {} harness {} session {}",
1426 replayed,
1427 completion.route_channel,
1428 replay_key.root.as_path().display(),
1429 replay_key.harness,
1430 replay_key.session
1431 );
1432 }
1433 log::info!(
1434 "subc attach: route {} bound to root {}",
1435 completion.route_channel,
1436 completion.bind_root_id.as_path().display()
1437 );
1438 Ok(())
1439}
1440
1441async fn handle_control_request(
1446 tx: &mpsc::Sender<Frame>,
1447 frame: &Frame,
1448 shared_app: &Arc<App>,
1449 executor: &Arc<Executor>,
1450 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1451 pending_binds: &mut HashMap<RouteChannel, PendingBind>,
1452 control_completion_tx: &mpsc::Sender<RouteBindCompletion>,
1453 push_senders: &PushSenders,
1454 dispatch: DispatchFn,
1455 user_config_path: Option<&Path>,
1456) -> Result<(), SubcError> {
1457 let request =
1458 serde_json::from_slice::<ModuleControlRequest>(&frame.body).map_err(SubcError::Json)?;
1459 match request {
1460 ModuleControlRequest::RouteBind {
1461 route_channel,
1462 target: _,
1463 identity,
1464 ..
1469 } => {
1470 let route_id = route_key(route_channel);
1471 if pending_binds.contains_key(&route_id) {
1472 return send_route_bind_error(
1473 tx,
1474 frame,
1475 "config_divergence",
1476 "route bind is already pending for channel",
1477 )
1478 .await;
1479 }
1480
1481 let bind_root_id = match ProjectRootId::from_path(&identity.project_root) {
1482 Ok(root_id) => root_id,
1483 Err(error) => {
1484 return send_route_bind_error(
1485 tx,
1486 frame,
1487 "config_divergence",
1488 &format!("invalid route project root: {error}"),
1489 )
1490 .await;
1491 }
1492 };
1493
1494 let request_id = format!("subc-bind-{route_channel}");
1497 let bind_project_root = identity.project_root.clone();
1498 let bind_harness = identity.harness.clone();
1499 let bind_session = identity.session.clone();
1500
1501 let local_tiers = crate::subc_config::read_local_cortexkit_config_tiers(
1513 user_config_path,
1514 Path::new(&bind_project_root),
1515 );
1516 let config_tiers: Vec<Value> = local_tiers
1517 .iter()
1518 .map(|t| json!({ "tier": t.tier, "source": t.source, "doc": t.doc }))
1519 .collect();
1520 let diagnostics_on_edit = diagnostics_on_edit_from_tiers(&local_tiers);
1521 let configure_json = json!({
1522 "id": request_id,
1523 "command": "configure",
1524 "project_root": bind_project_root,
1525 "harness": bind_harness,
1526 "session_id": bind_session.clone(),
1527 "config": config_tiers,
1528 });
1529 let configure_req = match serde_json::from_value::<RawRequest>(configure_json) {
1530 Ok(req) => req,
1531 Err(error) => {
1532 return send_route_bind_error(
1533 tx,
1534 frame,
1535 "config_divergence",
1536 &format!("failed to build configure request: {error}"),
1537 )
1538 .await;
1539 }
1540 };
1541
1542 let route_identity = RouteIdentity {
1543 root: bind_root_id.clone(),
1544 project_root: PathBuf::from(&bind_project_root),
1545 harness: bind_harness.clone(),
1546 session: bind_session.clone(),
1547 };
1548 let configure_session = route_identity.session.clone();
1549 let root_was_live = live_roots.contains_key(&bind_root_id);
1550 let inserted_new_actor = if root_was_live {
1551 log::debug!(
1552 "subc attach: reusing actor for route {} root {}",
1553 route_channel,
1554 bind_root_id.as_path().display()
1555 );
1556 false
1557 } else {
1558 let actor_ctx = Arc::new(AppContext::from_app(
1559 Arc::clone(shared_app),
1560 Config::default(),
1561 ));
1562 install_bash_compressor(&actor_ctx);
1563 actor_ctx.set_progress_sender(Some(progress_sender_for_root(
1564 push_senders.clone(),
1565 bind_root_id.clone(),
1566 )));
1567 let inserted =
1568 executor.register_actor(bind_root_id.clone(), Arc::clone(&actor_ctx));
1569 drop(actor_ctx);
1570 log::debug!(
1574 "subc attach: registered actor for route {} root {}",
1575 route_channel,
1576 bind_root_id.as_path().display()
1577 );
1578 inserted
1579 };
1580
1581 pending_binds.insert(
1582 route_id,
1583 PendingBind {
1584 bind_root_id: bind_root_id.clone(),
1585 inserted_new_actor,
1586 cancelled: false,
1587 },
1588 );
1589
1590 let configure_request_id = configure_req.id.clone();
1591 let configure_rx = executor.submit_async(
1592 bind_root_id.clone(),
1593 Lane::Mutating,
1594 configure_request_id.clone(),
1595 Box::new(move |ctx| {
1596 log_ctx::with_session(Some(configure_session.clone()), || {
1597 dispatch(configure_req, ctx)
1598 })
1599 }),
1600 );
1601
1602 let completion_tx = control_completion_tx.clone();
1603 let completion_executor = Arc::clone(executor);
1604 let completion_identity = route_identity;
1605 let completion_root = bind_root_id.clone();
1606 let completion_route_channel = route_channel;
1607 let completion_ver = frame.header.ver;
1608 let completion_corr = frame.header.corr;
1609 let completion_flags = frame.header.flags;
1610 tokio::spawn(async move {
1611 let configure_response =
1612 await_executor_response(configure_rx, configure_request_id.clone()).await;
1613 let drain_response = if configure_response.success && !root_was_live {
1614 let drain_request_id = format!("subc-bind-drain-{completion_route_channel}");
1615 let drain_response_id = drain_request_id.clone();
1616 let drain_rx = completion_executor.submit_async(
1617 completion_root.clone(),
1618 Lane::Mutating,
1619 drain_request_id.clone(),
1620 Box::new(move |ctx| {
1621 runtime_drain::drain_build_completions(ctx);
1622 Response::success(drain_response_id, json!({ "drained": true }))
1623 }),
1624 );
1625 Some(await_executor_response(drain_rx, drain_request_id).await)
1626 } else {
1627 None
1628 };
1629
1630 let completion = RouteBindCompletion {
1631 route_channel: completion_route_channel,
1632 identity: completion_identity,
1633 bind_root_id: completion_root,
1634 inserted_new_actor,
1635 configure_response,
1636 drain_response,
1637 diagnostics_on_edit,
1638 ver: completion_ver,
1639 corr: completion_corr,
1640 flags: completion_flags,
1641 };
1642 if completion_tx.send(completion).await.is_err() {
1643 log::debug!(
1644 "subc attach: dropped RouteBind completion for route {} after loop exit",
1645 completion_route_channel
1646 );
1647 }
1648 });
1649
1650 Ok(())
1651 }
1652 }
1653}
1654
1655fn install_bash_compressor(ctx: &AppContext) {
1656 let filter_registry_handle = ctx.shared_filter_registry();
1658 let compress_flag = ctx.bash_compress_flag();
1659 ctx.bash_background().set_compressor_with_exit_code(
1660 move |command: &str, output: String, exit_code: Option<i32>| {
1661 if !compress_flag.load(std::sync::atomic::Ordering::Relaxed) {
1662 return crate::compress::CompressionResult::new(output);
1663 }
1664 let registry_guard = match filter_registry_handle.read() {
1665 Ok(g) => g,
1666 Err(poisoned) => poisoned.into_inner(),
1667 };
1668 crate::compress::compress_with_registry_exit_code(
1669 command,
1670 &output,
1671 exit_code,
1672 ®istry_guard,
1673 )
1674 },
1675 );
1676}
1677
1678fn diagnostics_on_edit_from_tiers(tiers: &[ConfigTier]) -> bool {
1679 let mut diagnostics_on_edit = false;
1680 for tier in tiers {
1681 if let Some(value) = diagnostics_on_edit_from_doc(&tier.doc) {
1682 diagnostics_on_edit = value;
1683 }
1684 }
1685 diagnostics_on_edit
1686}
1687
1688fn diagnostics_on_edit_from_doc(doc: &str) -> Option<bool> {
1689 let stripped = strip_jsonc_for_subc(doc);
1690 let value = serde_json::from_str::<Value>(&stripped).ok()?;
1691 value
1692 .get("lsp")
1693 .and_then(Value::as_object)?
1694 .get("diagnostics_on_edit")
1695 .and_then(Value::as_bool)
1696}
1697
1698fn strip_jsonc_for_subc(source: &str) -> String {
1699 strip_trailing_commas_for_subc(&strip_jsonc_comments_for_subc(source))
1700}
1701
1702fn strip_jsonc_comments_for_subc(source: &str) -> String {
1703 let mut output = String::with_capacity(source.len());
1704 let mut chars = source.chars().peekable();
1705 let mut in_string = false;
1706 let mut escaped = false;
1707
1708 while let Some(ch) = chars.next() {
1709 if in_string {
1710 output.push(ch);
1711 if escaped {
1712 escaped = false;
1713 } else if ch == '\\' {
1714 escaped = true;
1715 } else if ch == '"' {
1716 in_string = false;
1717 }
1718 continue;
1719 }
1720
1721 if ch == '"' {
1722 in_string = true;
1723 output.push(ch);
1724 continue;
1725 }
1726
1727 if ch == '/' {
1728 match chars.peek().copied() {
1729 Some('/') => {
1730 chars.next();
1731 for next in chars.by_ref() {
1732 if next == '\n' {
1733 output.push('\n');
1734 break;
1735 }
1736 }
1737 }
1738 Some('*') => {
1739 chars.next();
1740 let mut previous = '\0';
1741 for next in chars.by_ref() {
1742 if next == '\n' {
1743 output.push('\n');
1744 }
1745 if previous == '*' && next == '/' {
1746 break;
1747 }
1748 previous = next;
1749 }
1750 }
1751 _ => output.push(ch),
1752 }
1753 continue;
1754 }
1755
1756 output.push(ch);
1757 }
1758
1759 output
1760}
1761
1762fn strip_trailing_commas_for_subc(source: &str) -> String {
1763 let chars = source.chars().collect::<Vec<_>>();
1764 let mut output = String::with_capacity(source.len());
1765 let mut index = 0usize;
1766 let mut in_string = false;
1767 let mut escaped = false;
1768
1769 while index < chars.len() {
1770 let ch = chars[index];
1771 if in_string {
1772 output.push(ch);
1773 if escaped {
1774 escaped = false;
1775 } else if ch == '\\' {
1776 escaped = true;
1777 } else if ch == '"' {
1778 in_string = false;
1779 }
1780 index += 1;
1781 continue;
1782 }
1783
1784 if ch == '"' {
1785 in_string = true;
1786 output.push(ch);
1787 index += 1;
1788 continue;
1789 }
1790
1791 if ch == ',' {
1792 let mut next = index + 1;
1793 while next < chars.len() && chars[next].is_whitespace() {
1794 next += 1;
1795 }
1796 if next < chars.len() && matches!(chars[next], '}' | ']') {
1797 index += 1;
1798 continue;
1799 }
1800 }
1801
1802 output.push(ch);
1803 index += 1;
1804 }
1805
1806 output
1807}
1808
1809async fn send_route_bind_error(
1810 tx: &mpsc::Sender<Frame>,
1811 frame: &Frame,
1812 code: &str,
1813 message: &str,
1814) -> Result<(), SubcError> {
1815 send_route_bind_error_parts(
1816 tx,
1817 frame.header.ver,
1818 frame.header.corr,
1819 frame.header.flags,
1820 code,
1821 message,
1822 )
1823 .await
1824}
1825
1826async fn send_route_bind_error_parts(
1827 tx: &mpsc::Sender<Frame>,
1828 ver: u8,
1829 corr: u64,
1830 flags: Flags,
1831 code: &str,
1832 message: &str,
1833) -> Result<(), SubcError> {
1834 let response = build_error_frame(ver, 0, corr, flags, code, message)?;
1835 send_frame(tx, response).await?;
1836 log::warn!("subc attach: route bind rejected ({code}): {message}");
1837 Ok(())
1838}
1839
1840async fn handle_tool_call(
1845 tx: &mpsc::Sender<Frame>,
1846 frame: &Frame,
1847 routes: &HashMap<RouteChannel, RouteIdentity>,
1848 pending_binds: &HashMap<RouteChannel, PendingBind>,
1849 live_roots: &mut HashMap<ProjectRootId, RootMeta>,
1850 executor: &Arc<Executor>,
1851 shutdown: &Arc<Notify>,
1852 dispatch: DispatchFn,
1853 allow_native_passthrough: bool,
1854) -> Result<(), SubcError> {
1855 let route_id = route_key(frame.header.channel);
1856 if pending_binds.contains_key(&route_id) {
1857 let error = build_error_frame(
1858 frame.header.ver,
1859 frame.header.channel,
1860 frame.header.corr,
1861 frame.header.flags,
1862 "route_not_bound",
1863 "route is not bound before tool call",
1864 )?;
1865 return send_frame(tx, error).await;
1866 }
1867
1868 let Some(identity) = routes.get(&route_id).cloned() else {
1869 let error = build_error_frame(
1870 frame.header.ver,
1871 frame.header.channel,
1872 frame.header.corr,
1873 frame.header.flags,
1874 "route_not_bound",
1875 "route is not bound before tool call",
1876 )?;
1877 return send_frame(tx, error).await;
1878 };
1879 if let Some(meta) = live_roots.get_mut(&identity.root) {
1880 meta.touch();
1881 }
1882
1883 let call = serde_json::from_slice::<ToolCallRequest>(&frame.body).map_err(SubcError::Json)?;
1884 let bare_name = call.name.clone();
1885 let format_context = crate::subc_format::FormatContext::from_tool_call(
1886 &bare_name,
1887 &call.arguments,
1888 identity.project_root.as_path(),
1889 );
1890
1891 let request_id = format!("subc-{}-{}", frame.header.channel, frame.header.corr);
1892 let project_root = identity.project_root.as_path();
1893 let diagnostics_on_edit = live_roots
1894 .get(&identity.root)
1895 .map(|meta| meta.diagnostics_on_edit)
1896 .unwrap_or(false);
1897 let translate_context = crate::subc_translate::TranslateContext {
1898 diagnostics_on_edit,
1899 };
1900 let (command, translated_args) = match crate::subc_translate::subc_translate_with_context(
1901 &call.name,
1902 &call.arguments,
1903 project_root,
1904 translate_context,
1905 ) {
1906 Ok(t) => (t.command, t.args),
1907 Err(err) if is_subc_agent_core_tool(&call.name) => {
1910 let response = Response::error(request_id.clone(), err.code, err.message);
1911 let response_frame = build_tool_response_frame(
1912 frame.header.ver,
1913 frame.header.channel,
1914 frame.header.corr,
1915 frame.header.flags,
1916 &bare_name,
1917 &format_context,
1918 &response,
1919 )?;
1920 return send_frame(tx, response_frame).await;
1921 }
1922 Err(_) if !allow_native_passthrough => {
1930 log::warn!(
1931 "subc tool call: rejecting non-manifest tool name {:?} on route {} (fail-closed)",
1932 call.name,
1933 frame.header.channel
1934 );
1935 let response = Response::error(
1936 request_id.clone(),
1937 "unknown_tool",
1938 format!("tool {:?} is not in the AFT tool manifest", call.name),
1939 );
1940 let response_frame = build_tool_response_frame(
1941 frame.header.ver,
1942 frame.header.channel,
1943 frame.header.corr,
1944 frame.header.flags,
1945 &bare_name,
1946 &format_context,
1947 &response,
1948 )?;
1949 return send_frame(tx, response_frame).await;
1950 }
1951 Err(_) => {
1954 let map = call.arguments.as_object().cloned().unwrap_or_default();
1955 (call.name.clone(), map)
1956 }
1957 };
1958
1959 let lane = command_lane(&command);
1960 let command_for_finalize = command.clone();
1961 let session_for_finalize = identity.session.clone();
1962 let mut map = translated_args;
1963 map.insert("id".to_string(), json!(request_id.clone()));
1964 map.insert("command".to_string(), json!(command));
1965 map.insert("session_id".to_string(), json!(identity.session.clone()));
1966
1967 let raw_req = match serde_json::from_value::<RawRequest>(Value::Object(map)) {
1968 Ok(req) => req,
1969 Err(error) => {
1970 let response = Response::error(
1971 request_id.clone(),
1972 "invalid_request",
1973 format!("failed to build request from tool call: {error}"),
1974 );
1975 let response_frame = build_tool_response_frame(
1976 frame.header.ver,
1977 frame.header.channel,
1978 frame.header.corr,
1979 frame.header.flags,
1980 &bare_name,
1981 &format_context,
1982 &response,
1983 )?;
1984 return send_frame(tx, response_frame).await;
1985 }
1986 };
1987
1988 let bare_name_for_frame = bare_name.clone();
1989 let format_context_for_frame = format_context;
1990 let rx = executor.submit_async(
1991 identity.root,
1992 lane,
1993 request_id.clone(),
1994 Box::new(move |ctx| {
1995 log_ctx::with_session(Some(session_for_finalize.clone()), || {
1996 let mut response = dispatch(raw_req, ctx);
1997 crate::response_finalize::attach_bg_completions(
1998 &mut response,
1999 ctx,
2000 &session_for_finalize,
2001 &command_for_finalize,
2002 );
2003 crate::response_finalize::attach_status_bar(
2004 &mut response,
2005 ctx,
2006 &command_for_finalize,
2007 );
2008 response
2009 })
2010 }),
2011 );
2012 let completion_tx = tx.clone();
2013 let completion_shutdown = Arc::clone(shutdown);
2014 let route_channel = frame.header.channel;
2015 let corr = frame.header.corr;
2016 let flags = frame.header.flags;
2017 let ver = frame.header.ver;
2018 let is_mutating = lane == Lane::Mutating;
2019 tokio::spawn(async move {
2020 let response = await_executor_response(rx, request_id.clone()).await;
2021 let fatal = is_mutating && response_is_internal_error(&response);
2022 match build_tool_response_frame(
2023 ver,
2024 route_channel,
2025 corr,
2026 flags,
2027 &bare_name_for_frame,
2028 &format_context_for_frame,
2029 &response,
2030 ) {
2031 Ok(response_frame) => {
2032 let _ = completion_tx.send(response_frame).await;
2033 }
2034 Err(error) => {
2035 log::error!("subc attach: failed to build tool response frame: {error}");
2036 }
2037 }
2038 if fatal {
2039 signal_fatal_teardown(
2040 &completion_tx,
2041 Some(route_channel),
2042 ver,
2043 corr,
2044 &completion_shutdown,
2045 )
2046 .await;
2047 }
2048 });
2049 Ok(())
2050}
2051
2052fn submit_maintenance_drain(
2053 executor: &Arc<Executor>,
2054 root_id: ProjectRootId,
2055 completion_tx: &mpsc::Sender<(ProjectRootId, Response)>,
2056) {
2057 let request_id = format!(
2058 "subc-maintenance-drain-{}",
2059 root_id.as_path().to_string_lossy()
2060 );
2061 let response_id = request_id.clone();
2062 let completion_root_id = root_id.clone();
2063 let rx = executor.submit_async(
2064 root_id,
2065 Lane::Mutating,
2066 request_id.clone(),
2067 Box::new(move |ctx| {
2068 runtime_drain::drain_configure_warning_events(ctx);
2069 runtime_drain::drain_search_index_events(ctx);
2070 runtime_drain::drain_callgraph_store_events(ctx);
2071 runtime_drain::drain_semantic_index_events(ctx);
2072 runtime_drain::drain_semantic_refresh_events(ctx);
2073 runtime_drain::drain_inspect_events(ctx);
2074 runtime_drain::drain_watcher_events(ctx);
2075 runtime_drain::drain_lsp_events(ctx);
2076 Response::success(response_id, json!({ "drained": true }))
2077 }),
2078 );
2079 let completion_tx = completion_tx.clone();
2080 tokio::spawn(async move {
2081 let response = await_executor_response(rx, request_id).await;
2082 let _ = completion_tx.send((completion_root_id, response)).await;
2083 });
2084}
2085
2086async fn await_executor_response(rx: oneshot::Receiver<Response>, request_id: String) -> Response {
2087 rx.await
2088 .unwrap_or_else(|_| Response::error(request_id, "internal_error", "executor dropped"))
2089}
2090
2091fn build_tool_response_frame(
2092 ver: u8,
2093 route_channel: u16,
2094 corr: u64,
2095 flags: Flags,
2096 bare_name: &str,
2097 format_context: &crate::subc_format::FormatContext,
2098 response: &Response,
2099) -> Result<Frame, SubcError> {
2100 let text =
2101 crate::subc_format::format_response_with_context(bare_name, response, format_context);
2102 let is_error = !response.success;
2103 let result = json!({
2104 "content": [{ "type": "text", "text": text }],
2105 "isError": is_error,
2106 });
2107 let body = serde_json::to_vec(&result).map_err(SubcError::Json)?;
2108
2109 Frame::build_with_version(ver, FrameType::Response, flags, route_channel, corr, body)
2110 .map_err(SubcError::FrameBuild)
2111}
2112
2113fn build_error_frame(
2114 ver: u8,
2115 channel: u16,
2116 corr: u64,
2117 flags: Flags,
2118 code: &str,
2119 message: &str,
2120) -> Result<Frame, SubcError> {
2121 let body = serde_json::to_vec(&ErrorBody {
2122 code: code.to_string(),
2123 message: message.to_string(),
2124 })
2125 .map_err(SubcError::Json)?;
2126 Frame::build_with_version(ver, FrameType::Error, flags, channel, corr, body)
2127 .map_err(SubcError::FrameBuild)
2128}
2129
2130fn build_goodbye_frame(ver: u8, channel: u16, corr: u64) -> Result<Frame, SubcError> {
2131 Frame::build_with_version(
2132 ver,
2133 FrameType::Goodbye,
2134 control_flags(),
2135 channel,
2136 corr,
2137 Vec::new(),
2138 )
2139 .map_err(SubcError::FrameBuild)
2140}
2141
2142async fn signal_fatal_teardown(
2143 tx: &mpsc::Sender<Frame>,
2144 route_channel: Option<u16>,
2145 ver: u8,
2146 corr: u64,
2147 shutdown: &Arc<Notify>,
2148) {
2149 if let Some(route_channel) = route_channel {
2150 if let Ok(frame) = build_goodbye_frame(ver, route_channel, corr) {
2151 if let Err(error) = send_frame(tx, frame).await {
2152 log::warn!(
2153 "subc attach: failed to queue fatal route Goodbye for route {route_channel}: {error}"
2154 );
2155 }
2156 }
2157 }
2158 if let Ok(frame) = build_goodbye_frame(ver, 0, 0) {
2159 if let Err(error) = send_frame(tx, frame).await {
2160 log::warn!("subc attach: failed to queue fatal channel-0 Goodbye: {error}");
2161 }
2162 }
2163 shutdown.notify_one();
2164}
2165
2166fn response_message(response: &Response, fallback: &str) -> String {
2167 response
2168 .data
2169 .get("message")
2170 .and_then(Value::as_str)
2171 .map(ToOwned::to_owned)
2172 .unwrap_or_else(|| fallback.to_string())
2173}
2174
2175fn response_is_internal_error(response: &Response) -> bool {
2176 !response.success && response.data.get("code").and_then(Value::as_str) == Some("internal_error")
2177}
2178
2179fn is_subc_agent_core_tool(name: &str) -> bool {
2180 matches!(
2181 name,
2182 "status" | "read" | "write" | "edit" | "grep" | "search" | "outline" | "inspect"
2183 )
2184}
2185
2186fn command_lane(command: &str) -> Lane {
2187 match command {
2188 "ping"
2189 | "version"
2190 | "echo"
2191 | "bash_drain_completions"
2192 | "bash_regex_match"
2193 | "db_get_state"
2194 | "db_get_host_state"
2195 | "read"
2196 | "undo_preview"
2197 | "edit_history"
2198 | "checkpoint_paths"
2199 | "list_checkpoints"
2200 | "glob"
2201 | "grep"
2202 | "git_conflicts"
2203 | "ast_search" => Lane::PureRead,
2204
2205 "bash_status" | "outline" | "zoom" => Lane::PureRead,
2209
2210 "status"
2211 | "inspect"
2212 | "lsp_diagnostics"
2213 | "lsp_inspect"
2214 | "lsp_hover"
2215 | "lsp_goto_definition"
2216 | "lsp_find_references"
2217 | "lsp_prepare_rename" => Lane::SerialLspStatus,
2218
2219 "semantic_search" | "search" | "callers" | "impact" | "call_tree" | "trace_to"
2220 | "trace_to_symbol" | "trace_data" | "inspect_tier2_run" => Lane::HeavyInit,
2221
2222 "bash"
2223 | "bash_ack_completions"
2224 | "bash_notify"
2225 | "bash_unnotify"
2226 | "bash_promote"
2227 | "bash_kill"
2228 | "bash_write"
2229 | "db_set_state"
2230 | "db_set_host_state"
2231 | "undo"
2232 | "checkpoint"
2233 | "restore_checkpoint"
2234 | "write"
2235 | "delete_file"
2236 | "move_file"
2237 | "edit"
2238 | "edit_symbol"
2239 | "edit_match"
2240 | "batch"
2241 | "add_import"
2242 | "remove_import"
2243 | "organize_imports"
2244 | "configure"
2245 | "move_symbol"
2246 | "extract_function"
2247 | "inline_symbol"
2248 | "ast_replace"
2249 | "lsp_rename"
2250 | "list_filters"
2251 | "trust_filter_project"
2252 | "untrust_filter_project"
2253 | "snapshot" => Lane::Mutating,
2254
2255 _ => Lane::Mutating,
2256 }
2257}
2258
2259#[derive(Debug, Deserialize)]
2260struct ToolCallRequest {
2261 name: String,
2262 #[serde(default)]
2263 arguments: Value,
2264}
2265
2266static SUBC_TOOL_SCHEMAS: LazyLock<serde_json::Map<String, Value>> = LazyLock::new(|| {
2267 serde_json::from_str(include_str!("subc_tool_schemas.json"))
2268 .unwrap_or_else(|e| panic!("subc_tool_schemas.json: {e}"))
2269});
2270
2271fn tool_schema(name: &str) -> Value {
2272 SUBC_TOOL_SCHEMAS.get(name).cloned().unwrap_or_else(|| {
2273 log::warn!(
2274 "subc build_manifest: missing embedded schema for tool {name:?}; using placeholder"
2275 );
2276 json!({ "type": "object" })
2277 })
2278}
2279
2280fn build_manifest() -> ModuleManifest {
2285 let tool = |name: &str, execution_mode: ExecutionMode| Tool {
2286 name: name.to_string(),
2287 execution_mode,
2288 schema: tool_schema(name),
2289 };
2290 ModuleManifest {
2297 module_id: "aft".to_string(),
2298 module_version: env!("CARGO_PKG_VERSION").to_string(),
2299 protocol_ver: PROTOCOL_VERSION,
2300 trust_tier: TrustTier::FirstParty,
2301 provides: vec![ProviderRole::ToolProvider {
2302 tools: vec![
2303 tool("status", ExecutionMode::Pure),
2304 tool("read", ExecutionMode::Pure),
2305 tool("grep", ExecutionMode::Pure),
2306 tool("search", ExecutionMode::Pure),
2307 tool("outline", ExecutionMode::Pure),
2308 tool("inspect", ExecutionMode::Pure),
2309 tool("edit", ExecutionMode::Mutating),
2310 tool("write", ExecutionMode::Mutating),
2311 ],
2312 identity_scope: vec![IdentityScope::Session, IdentityScope::Project],
2313 concurrency: Concurrency::ModuleManaged,
2314 emits_push: true,
2315 sub_supervises: true,
2316 }],
2317 consumes: Vec::new(),
2318 scheduled_tasks: Vec::new(),
2319 bindings: Bindings {
2320 storage: StorageBinding {
2321 kind: StorageKind::Sqlite,
2322 scope: StorageScope::Project,
2323 owns_schema: true,
2324 },
2325 config: ConfigBinding {
2326 source: ConfigSource::SubcMediated,
2327 tiers: vec!["user".to_string(), "project".to_string()],
2328 expansion: std::collections::BTreeMap::new(),
2329 },
2330 vault_grants: Vec::new(),
2331 identity: IdentityBinding {
2332 requires: vec![IdentityScope::Project],
2333 optional: vec![IdentityScope::Session],
2334 },
2335 },
2336 }
2337}
2338
2339fn control_flags() -> Flags {
2340 Flags::new(false, Priority::Passive, false)
2341}
2342
2343#[cfg(test)]
2344mod tests {
2345 use super::*;
2346 use crate::bash_background::BgTaskStatus;
2347 use crate::protocol::{
2348 BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, ConfigureWarningsFrame,
2349 ProgressFrame, StatusChangedFrame,
2350 };
2351 use serde_json::json;
2352
2353 fn test_root(name: &str) -> (tempfile::TempDir, ProjectRootId) {
2354 let dir = tempfile::Builder::new()
2355 .prefix(name)
2356 .tempdir()
2357 .expect("temp root");
2358 let root = ProjectRootId::from_path(dir.path()).expect("project root id");
2359 (dir, root)
2360 }
2361
2362 fn status_frame(seq: u64) -> PushFrame {
2363 status_frame_with_session(seq, None)
2364 }
2365
2366 fn status_frame_with_session(seq: u64, session_id: Option<&str>) -> PushFrame {
2367 PushFrame::StatusChanged(StatusChangedFrame {
2368 frame_type: "status_changed",
2369 session_id: session_id.map(str::to_string),
2370 snapshot: json!({ "seq": seq }),
2371 })
2372 }
2373
2374 fn completion_frame(task_id: &str) -> PushFrame {
2375 completion_frame_with_session(task_id, "session-1")
2376 }
2377
2378 fn completion_frame_with_session(task_id: &str, session_id: &str) -> PushFrame {
2379 PushFrame::BashCompleted(BashCompletedFrame {
2380 frame_type: "bash_completed",
2381 task_id: task_id.to_string(),
2382 session_id: session_id.to_string(),
2383 status: BgTaskStatus::Completed,
2384 exit_code: Some(0),
2385 command: format!("echo {task_id}"),
2386 output_preview: String::new(),
2387 output_truncated: false,
2388 original_tokens: None,
2389 compressed_tokens: None,
2390 tokens_skipped: false,
2391 })
2392 }
2393
2394 fn long_running_frame(task_id: &str, elapsed_ms: u64) -> PushFrame {
2395 long_running_frame_with_session(task_id, "session-1", elapsed_ms)
2396 }
2397
2398 fn long_running_frame_with_session(
2399 task_id: &str,
2400 session_id: &str,
2401 elapsed_ms: u64,
2402 ) -> PushFrame {
2403 PushFrame::BashLongRunning(BashLongRunningFrame {
2404 frame_type: "bash_long_running",
2405 task_id: task_id.to_string(),
2406 session_id: session_id.to_string(),
2407 command: format!("sleep {elapsed_ms}"),
2408 elapsed_ms,
2409 })
2410 }
2411
2412 fn pattern_match_frame(session_id: &str) -> PushFrame {
2413 PushFrame::BashPatternMatch(BashPatternMatchFrame {
2414 frame_type: "bash_pattern_match",
2415 task_id: "task-pattern".to_string(),
2416 session_id: session_id.to_string(),
2417 watch_id: "watch-1".to_string(),
2418 match_text: "needle".to_string(),
2419 match_offset: 7,
2420 context: "haystack needle".to_string(),
2421 once: true,
2422 reason: "pattern_match",
2423 })
2424 }
2425
2426 fn configure_warnings_frame(session_id: Option<&str>) -> PushFrame {
2427 PushFrame::ConfigureWarnings(ConfigureWarningsFrame {
2428 frame_type: "configure_warnings",
2429 session_id: session_id.map(str::to_string),
2430 project_root: "/tmp/subc-test".to_string(),
2431 source_file_count: 0,
2432 source_file_count_exceeds_max: false,
2433 max_callgraph_files: 0,
2434 warnings: Vec::new(),
2435 })
2436 }
2437
2438 fn route_identity(root: &ProjectRootId, session_id: &str) -> RouteIdentity {
2439 RouteIdentity {
2440 root: root.clone(),
2441 project_root: root.as_path().to_path_buf(),
2442 harness: "opencode".to_string(),
2443 session: session_id.to_string(),
2444 }
2445 }
2446
2447 fn progress_frame(request_id: &str, kind: ProgressKind, chunk: &str) -> PushFrame {
2448 PushFrame::Progress(ProgressFrame::new(request_id, kind, chunk))
2449 }
2450
2451 fn status_seq(frame: &PushFrame) -> Option<u64> {
2452 match frame {
2453 PushFrame::StatusChanged(status) => status.snapshot.get("seq").and_then(|v| v.as_u64()),
2454 _ => None,
2455 }
2456 }
2457
2458 fn completion_task(frame: &PushFrame) -> Option<&str> {
2459 match frame {
2460 PushFrame::BashCompleted(completion) => Some(completion.task_id.as_str()),
2461 _ => None,
2462 }
2463 }
2464
2465 fn push_frame_task_id(frame: &Frame) -> Option<String> {
2466 let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2467 body.get("task_id")
2468 .and_then(serde_json::Value::as_str)
2469 .map(str::to_string)
2470 }
2471
2472 #[test]
2473 fn frame_classification_matches_push_delivery_contract() {
2474 let completion = completion_frame_with_session("done", "session-a");
2475 assert_eq!(frame_session(&completion), Some("session-a"));
2476 assert!(frame_is_reliable(&completion));
2477
2478 let long_running = long_running_frame_with_session("long", "session-b", 42);
2479 assert_eq!(frame_session(&long_running), Some("session-b"));
2480 assert!(!frame_is_reliable(&long_running));
2481
2482 let pattern_match = pattern_match_frame("session-c");
2483 assert_eq!(frame_session(&pattern_match), Some("session-c"));
2484 assert!(frame_is_reliable(&pattern_match));
2485
2486 let tagged_warnings = configure_warnings_frame(Some("session-d"));
2487 assert_eq!(frame_session(&tagged_warnings), Some("session-d"));
2488 assert!(frame_is_reliable(&tagged_warnings));
2489
2490 let untagged_warnings = configure_warnings_frame(None);
2491 assert_eq!(frame_session(&untagged_warnings), None);
2492 assert!(frame_is_reliable(&untagged_warnings));
2493
2494 let tagged_status = status_frame_with_session(1, Some("session-e"));
2495 assert_eq!(frame_session(&tagged_status), Some("session-e"));
2496 assert!(!frame_is_reliable(&tagged_status));
2497
2498 let project_status = status_frame(2);
2499 assert_eq!(frame_session(&project_status), None);
2500 assert!(!frame_is_reliable(&project_status));
2501
2502 let progress = progress_frame("request-1", ProgressKind::Stdout, "chunk");
2503 assert_eq!(frame_session(&progress), None);
2504 assert!(!frame_is_reliable(&progress));
2505 }
2506
2507 #[test]
2508 fn fan_out_push_frame_routes_session_scoped_and_project_scoped_frames() {
2509 let (_root_dir, root) = test_root("subc-session-routing-root");
2510 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(8);
2511 let identity1 = route_identity(&root, "session-1");
2512 let identity2 = route_identity(&root, "session-2");
2513 let mut routes = HashMap::new();
2514 routes.insert(route_key(1), identity1.clone());
2515 routes.insert(route_key(2), identity2.clone());
2516 let mut root_channels = HashMap::new();
2517 root_channels.insert(root.clone(), HashSet::from([route_key(1), route_key(2)]));
2518 let mut session_identity = HashMap::new();
2519 remember_session_identity(&mut session_identity, &identity1);
2520 remember_session_identity(&mut session_identity, &identity2);
2521 let mut retry_buffer = HashMap::new();
2522 let mut push_buffer = HashMap::new();
2523
2524 let session_result = fan_out_reliable_push_frame(
2525 &writer_tx,
2526 &routes,
2527 &root_channels,
2528 &session_identity,
2529 &mut retry_buffer,
2530 &mut push_buffer,
2531 &root,
2532 &completion_frame_with_session("session-only", "session-1"),
2533 );
2534 assert_eq!(
2535 session_result,
2536 FanOutResult {
2537 matched_channels: 1,
2538 sent_frames: 1,
2539 }
2540 );
2541 assert!(retry_buffer.is_empty());
2542 assert!(push_buffer.is_empty());
2543 let session_push = writer_rx.try_recv().expect("session push queued");
2544 assert_eq!(session_push.header.ty, FrameType::Push);
2545 assert_eq!(session_push.header.channel, 1);
2546 assert!(
2547 writer_rx.try_recv().is_err(),
2548 "session-scoped frame must not broadcast to sibling sessions"
2549 );
2550
2551 let project_result =
2552 fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(9));
2553 assert_eq!(
2554 project_result,
2555 FanOutResult {
2556 matched_channels: 2,
2557 sent_frames: 2,
2558 }
2559 );
2560 let project_channels: HashSet<_> = [
2561 writer_rx
2562 .try_recv()
2563 .expect("first project push")
2564 .header
2565 .channel,
2566 writer_rx
2567 .try_recv()
2568 .expect("second project push")
2569 .header
2570 .channel,
2571 ]
2572 .into_iter()
2573 .collect();
2574 assert_eq!(project_channels, HashSet::from([1, 2]));
2575 assert!(writer_rx.try_recv().is_err());
2576 }
2577
2578 #[test]
2579 fn push_buffer_drops_oldest_per_replay_key() {
2580 let (_root_dir, root) = test_root("subc-buffer-bound-root");
2581 let key = ReplayKey {
2582 root,
2583 harness: "opencode".to_string(),
2584 session: "session-1".to_string(),
2585 };
2586 let mut push_buffer = HashMap::new();
2587 let total = PUSH_BUFFER_MAX_PER_KEY + 3;
2588
2589 for index in 0..total {
2590 buffer_push_frame(
2591 &mut push_buffer,
2592 key.clone(),
2593 completion_frame(&format!("task-{index}")),
2594 );
2595 }
2596
2597 let buffered = push_buffer.get(&key).expect("buffer entry");
2598 assert_eq!(buffered.len(), PUSH_BUFFER_MAX_PER_KEY);
2599 let tasks: Vec<String> = buffered
2600 .iter()
2601 .filter_map(completion_task)
2602 .map(str::to_string)
2603 .collect();
2604 assert_eq!(tasks.first().map(String::as_str), Some("task-3"));
2605 assert_eq!(
2606 tasks.last().map(String::as_str),
2607 Some(format!("task-{}", total - 1).as_str())
2608 );
2609 }
2610
2611 #[test]
2612 fn replay_buffered_push_frames_drains_to_bound_channel() {
2613 let (_root_dir, root) = test_root("subc-buffer-replay-root");
2614 let key = ReplayKey {
2615 root,
2616 harness: "opencode".to_string(),
2617 session: "session-1".to_string(),
2618 };
2619 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(4);
2620 let mut push_buffer = HashMap::new();
2621 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-a"));
2622 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame("task-b"));
2623
2624 let replayed =
2625 replay_buffered_push_frames(&writer_tx, route_key(3), &mut push_buffer, &key);
2626
2627 assert_eq!(replayed, 2);
2628 assert!(!push_buffer.contains_key(&key));
2629 for expected_task in ["task-a", "task-b"] {
2630 let frame = writer_rx.try_recv().expect("replayed push");
2631 assert_eq!(frame.header.ty, FrameType::Push);
2632 assert_eq!(frame.header.channel, 3);
2633 let body: serde_json::Value = serde_json::from_slice(&frame.body).expect("push body");
2634 assert_eq!(body["task_id"].as_str(), Some(expected_task));
2635 }
2636 assert!(writer_rx.try_recv().is_err());
2637 }
2638
2639 #[test]
2640 fn coalesce_push_batch_collapses_lossy_and_preserves_reliable_fifo() {
2641 let (_root_dir, root) = test_root("subc-coalesce-root");
2642 let (_other_dir, other_root) = test_root("subc-coalesce-other");
2643
2644 let output = coalesce_push_batch(vec![
2645 (root.clone(), status_frame(1)),
2646 (root.clone(), completion_frame("task-1")),
2647 (root.clone(), status_frame(2)),
2648 (root.clone(), completion_frame("task-2")),
2649 (root.clone(), long_running_frame("long-task", 100)),
2650 (root.clone(), long_running_frame("long-task", 200)),
2651 (other_root.clone(), status_frame(9)),
2652 ]);
2653
2654 let completion_tasks: Vec<_> = output
2655 .iter()
2656 .filter_map(|(_, frame)| completion_task(frame))
2657 .collect();
2658 assert_eq!(completion_tasks, vec!["task-1", "task-2"]);
2659
2660 let root_statuses: Vec<_> = output
2661 .iter()
2662 .filter(|(output_root, _)| output_root == &root)
2663 .filter_map(|(_, frame)| status_seq(frame))
2664 .collect();
2665 assert_eq!(root_statuses, vec![2]);
2666
2667 let other_statuses: Vec<_> = output
2668 .iter()
2669 .filter(|(output_root, _)| output_root == &other_root)
2670 .filter_map(|(_, frame)| status_seq(frame))
2671 .collect();
2672 assert_eq!(other_statuses, vec![9]);
2673
2674 let long_running_elapsed: Vec<_> = output
2675 .iter()
2676 .filter_map(|(_, frame)| match frame {
2677 PushFrame::BashLongRunning(long_running) => Some(long_running.elapsed_ms),
2678 _ => None,
2679 })
2680 .collect();
2681 assert_eq!(long_running_elapsed, vec![200]);
2682 }
2683
2684 #[test]
2685 fn coalesce_push_batch_keeps_progress_stream_keys_separate() {
2686 let (_root_dir, root) = test_root("subc-progress-coalesce-root");
2687
2688 let output = coalesce_push_batch(vec![
2689 (
2690 root.clone(),
2691 progress_frame("request-1", ProgressKind::Stdout, "old stdout"),
2692 ),
2693 (
2694 root.clone(),
2695 progress_frame("request-1", ProgressKind::Stderr, "stderr"),
2696 ),
2697 (
2698 root.clone(),
2699 progress_frame("request-2", ProgressKind::Stdout, "other stdout"),
2700 ),
2701 (
2702 root.clone(),
2703 progress_frame("request-1", ProgressKind::Stdout, "new stdout"),
2704 ),
2705 ]);
2706
2707 let progress: Vec<_> = output
2708 .iter()
2709 .filter_map(|(_, frame)| match frame {
2710 PushFrame::Progress(progress) => Some((
2711 progress.request_id.as_str(),
2712 match progress.kind {
2713 ProgressKind::Stdout => "stdout",
2714 ProgressKind::Stderr => "stderr",
2715 },
2716 progress.chunk.as_str(),
2717 )),
2718 _ => None,
2719 })
2720 .collect();
2721
2722 assert_eq!(
2723 progress,
2724 vec![
2725 ("request-1", "stderr", "stderr"),
2726 ("request-2", "stdout", "other stdout"),
2727 ("request-1", "stdout", "new stdout"),
2728 ]
2729 );
2730 }
2731
2732 #[test]
2733 fn progress_sender_keeps_reliable_off_saturated_lossy_funnel_without_blocking() {
2734 let (_root_dir, root) = test_root("subc-push-full-root");
2735 let (lossy_tx, mut lossy_rx) = mpsc::channel::<PushEnvelope>(1);
2736 let (reliable_tx, mut reliable_rx) = mpsc::unbounded_channel::<PushEnvelope>();
2737 let sender = progress_sender_for_root(
2738 PushSenders {
2739 lossy_tx,
2740 reliable_tx,
2741 },
2742 root.clone(),
2743 );
2744
2745 let started = Instant::now();
2746 sender(status_frame(1));
2747 sender(status_frame(2));
2748 sender(completion_frame("reliable-after-lossy-full"));
2749 assert!(
2750 started.elapsed() < Duration::from_millis(50),
2751 "saturated push sender must return immediately"
2752 );
2753
2754 let (received_root, received_frame) =
2755 lossy_rx.try_recv().expect("first lossy frame queued");
2756 assert_eq!(received_root, root);
2757 assert_eq!(status_seq(&received_frame), Some(1));
2758 assert!(
2759 lossy_rx.try_recv().is_err(),
2760 "second lossy frame should be dropped"
2761 );
2762
2763 let (reliable_root, reliable_frame) = reliable_rx
2764 .try_recv()
2765 .expect("reliable frame bypasses lossy backpressure");
2766 assert_eq!(reliable_root, root);
2767 assert_eq!(
2768 completion_task(&reliable_frame),
2769 Some("reliable-after-lossy-full")
2770 );
2771 assert!(reliable_rx.try_recv().is_err());
2772 }
2773
2774 #[test]
2775 fn fan_out_lossy_push_frame_drops_when_writer_is_full_without_blocking() {
2776 let (_root_dir, root) = test_root("subc-writer-full-root");
2777 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2778 writer_tx
2779 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2780 .expect("prefill writer queue");
2781
2782 let mut root_channels = HashMap::new();
2783 root_channels.insert(root.clone(), HashSet::from([route_key(7)]));
2784
2785 let routes = HashMap::new();
2786 let started = Instant::now();
2787 let result =
2788 fan_out_lossy_push_frame(&writer_tx, &routes, &root_channels, &root, &status_frame(1));
2789 assert!(
2790 started.elapsed() < Duration::from_millis(50),
2791 "saturated writer fan-out must return immediately"
2792 );
2793 assert_eq!(
2794 result,
2795 FanOutResult {
2796 matched_channels: 1,
2797 sent_frames: 0,
2798 }
2799 );
2800
2801 let queued = writer_rx
2802 .try_recv()
2803 .expect("prefilled frame remains queued");
2804 assert_eq!(queued.header.ty, FrameType::Ping);
2805 assert!(
2806 writer_rx.try_recv().is_err(),
2807 "push should be dropped on full writer"
2808 );
2809 }
2810
2811 #[test]
2812 fn reliable_push_backpressure_buffers_and_retries_on_tick() {
2813 let (_root_dir, root) = test_root("subc-retry-buffer-root");
2814 let identity = route_identity(&root, "session-1");
2815 let key = ReplayKey::from_identity(&identity);
2816 let mut routes = HashMap::new();
2817 routes.insert(route_key(9), identity.clone());
2818 let mut root_channels = HashMap::new();
2819 root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2820 let mut session_identity = HashMap::new();
2821 remember_session_identity(&mut session_identity, &identity);
2822 let mut retry_buffer = HashMap::new();
2823 let mut push_buffer = HashMap::new();
2824 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2825 writer_tx
2826 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2827 .expect("prefill writer queue");
2828
2829 let result = fan_out_reliable_push_frame(
2830 &writer_tx,
2831 &routes,
2832 &root_channels,
2833 &session_identity,
2834 &mut retry_buffer,
2835 &mut push_buffer,
2836 &root,
2837 &completion_frame("retry-task"),
2838 );
2839
2840 assert_eq!(
2841 result,
2842 FanOutResult {
2843 matched_channels: 1,
2844 sent_frames: 0,
2845 }
2846 );
2847 assert!(push_buffer.is_empty());
2848 assert_eq!(retry_buffer.get(&route_key(9)).map(VecDeque::len), Some(1));
2849 assert_eq!(&retry_buffer[&route_key(9)][0].0, &key);
2850
2851 let queued = writer_rx.try_recv().expect("prefilled frame");
2852 assert_eq!(queued.header.ty, FrameType::Ping);
2853 assert_eq!(
2854 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2855 1
2856 );
2857 let retried = writer_rx.try_recv().expect("retried reliable push");
2858 assert_eq!(retried.header.ty, FrameType::Push);
2859 assert_eq!(retried.header.channel, 9);
2860 assert_eq!(push_frame_task_id(&retried).as_deref(), Some("retry-task"));
2861 assert!(!retry_buffer.contains_key(&route_key(9)));
2862 }
2863
2864 #[test]
2865 fn reliable_push_fifo_gates_new_frames_behind_retry_buffer() {
2866 let (_root_dir, root) = test_root("subc-retry-fifo-root");
2867 let identity = route_identity(&root, "session-1");
2868 let mut routes = HashMap::new();
2869 routes.insert(route_key(9), identity.clone());
2870 let mut root_channels = HashMap::new();
2871 root_channels.insert(root.clone(), HashSet::from([route_key(9)]));
2872 let mut session_identity = HashMap::new();
2873 remember_session_identity(&mut session_identity, &identity);
2874 let mut retry_buffer = HashMap::new();
2875 let mut push_buffer = HashMap::new();
2876 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(1);
2877 writer_tx
2878 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2879 .expect("prefill writer queue");
2880
2881 let first = completion_frame("fifo-1");
2882 let second = completion_frame("fifo-2");
2883 let _ = fan_out_reliable_push_frame(
2884 &writer_tx,
2885 &routes,
2886 &root_channels,
2887 &session_identity,
2888 &mut retry_buffer,
2889 &mut push_buffer,
2890 &root,
2891 &first,
2892 );
2893 let queued = writer_rx.try_recv().expect("free writer capacity");
2894 assert_eq!(queued.header.ty, FrameType::Ping);
2895
2896 let _ = fan_out_reliable_push_frame(
2897 &writer_tx,
2898 &routes,
2899 &root_channels,
2900 &session_identity,
2901 &mut retry_buffer,
2902 &mut push_buffer,
2903 &root,
2904 &second,
2905 );
2906 assert!(
2907 writer_rx.try_recv().is_err(),
2908 "second reliable frame must not bypass pending retry frame"
2909 );
2910 let queued_tasks: Vec<_> = retry_buffer[&route_key(9)]
2911 .iter()
2912 .filter_map(|(_, frame)| completion_task(frame))
2913 .collect();
2914 assert_eq!(queued_tasks, vec!["fifo-1", "fifo-2"]);
2915
2916 assert_eq!(
2917 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2918 1
2919 );
2920 let first_sent = writer_rx.try_recv().expect("first reliable push");
2921 assert_eq!(push_frame_task_id(&first_sent).as_deref(), Some("fifo-1"));
2922 assert_eq!(
2923 drain_retry_buffer_for_channel(&writer_tx, route_key(9), &mut retry_buffer),
2924 1
2925 );
2926 let second_sent = writer_rx.try_recv().expect("second reliable push");
2927 assert_eq!(push_frame_task_id(&second_sent).as_deref(), Some("fifo-2"));
2928 assert!(!retry_buffer.contains_key(&route_key(9)));
2929 }
2930
2931 #[test]
2932 fn replay_buffered_push_frames_drains_incrementally_on_backpressure() {
2933 let (_root_dir, root) = test_root("subc-incremental-replay-root");
2934 let key = ReplayKey {
2935 root,
2936 harness: "opencode".to_string(),
2937 session: "session-1".to_string(),
2938 };
2939 let (writer_tx, mut writer_rx) = mpsc::channel::<Frame>(2);
2940 writer_tx
2941 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
2942 .expect("prefill writer queue");
2943 let mut push_buffer = HashMap::new();
2944 for task in ["replay-1", "replay-2", "replay-3"] {
2945 buffer_push_frame(&mut push_buffer, key.clone(), completion_frame(task));
2946 }
2947
2948 assert_eq!(
2949 replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2950 1
2951 );
2952 assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(2));
2953 let remaining: Vec<_> = push_buffer[&key]
2954 .iter()
2955 .filter_map(completion_task)
2956 .collect();
2957 assert_eq!(remaining, vec!["replay-2", "replay-3"]);
2958
2959 let queued = writer_rx.try_recv().expect("prefilled frame");
2960 assert_eq!(queued.header.ty, FrameType::Ping);
2961 let first = writer_rx.try_recv().expect("first replayed push");
2962 assert_eq!(push_frame_task_id(&first).as_deref(), Some("replay-1"));
2963
2964 assert_eq!(
2965 replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
2966 2
2967 );
2968 let second = writer_rx.try_recv().expect("second replayed push");
2969 let third = writer_rx.try_recv().expect("third replayed push");
2970 assert_eq!(push_frame_task_id(&second).as_deref(), Some("replay-2"));
2971 assert_eq!(push_frame_task_id(&third).as_deref(), Some("replay-3"));
2972 assert!(!push_buffer.contains_key(&key));
2973 }
2974
2975 #[test]
2976 fn goodbye_migrates_retry_buffer_into_detach_replay() {
2977 let (_root_dir, root) = test_root("subc-goodbye-migration-root");
2978 let key = ReplayKey {
2979 root,
2980 harness: "opencode".to_string(),
2981 session: "session-1".to_string(),
2982 };
2983 let mut retry_buffer = HashMap::new();
2984 buffer_retry_frame(
2985 &mut retry_buffer,
2986 route_key(5),
2987 key.clone(),
2988 completion_frame("migrated-task"),
2989 );
2990 let mut push_buffer = HashMap::new();
2991
2992 assert_eq!(
2993 migrate_retry_buffer_to_push_buffer(&mut retry_buffer, route_key(5), &mut push_buffer),
2994 1
2995 );
2996
2997 assert!(!retry_buffer.contains_key(&route_key(5)));
2998 assert_eq!(push_buffer.get(&key).map(VecDeque::len), Some(1));
2999 assert_eq!(
3000 completion_task(&push_buffer[&key][0]),
3001 Some("migrated-task")
3002 );
3003 }
3004
3005 #[test]
3006 fn permanent_push_send_failure_is_dropped_not_retried_forever() {
3007 let (_root_dir, root) = test_root("subc-permanent-failure-root");
3008 let key = ReplayKey {
3009 root,
3010 harness: "opencode".to_string(),
3011 session: "session-1".to_string(),
3012 };
3013 let (writer_tx, writer_rx) = mpsc::channel::<Frame>(1);
3014 drop(writer_rx);
3015
3016 let mut push_buffer = HashMap::new();
3017 buffer_push_frame(
3018 &mut push_buffer,
3019 key.clone(),
3020 completion_frame("closed-replay"),
3021 );
3022 assert_eq!(
3023 replay_buffered_push_frames(&writer_tx, route_key(4), &mut push_buffer, &key),
3024 0
3025 );
3026 assert!(!push_buffer.contains_key(&key));
3027
3028 let mut retry_buffer = HashMap::new();
3029 buffer_retry_frame(
3030 &mut retry_buffer,
3031 route_key(4),
3032 key,
3033 completion_frame("closed-retry"),
3034 );
3035 assert_eq!(
3036 drain_retry_buffer_for_channel(&writer_tx, route_key(4), &mut retry_buffer),
3037 0
3038 );
3039 assert!(!retry_buffer.contains_key(&route_key(4)));
3040 }
3041
3042 #[test]
3043 fn completed_task_suppresses_stale_long_running_lossy_push() {
3044 let mut completed_tasks = CompletedTaskIds::default();
3045 assert!(!should_drop_lossy_push(
3046 &completed_tasks,
3047 &long_running_frame("stale-task", 100)
3048 ));
3049
3050 completed_tasks.remember("stale-task");
3051
3052 assert!(should_drop_lossy_push(
3053 &completed_tasks,
3054 &long_running_frame("stale-task", 200)
3055 ));
3056 assert!(!should_drop_lossy_push(
3057 &completed_tasks,
3058 &long_running_frame("other-task", 200)
3059 ));
3060 }
3061
3062 #[tokio::test]
3063 async fn control_send_times_out_when_writer_queue_remains_full() {
3064 let (writer_tx, _writer_rx) = mpsc::channel::<Frame>(1);
3065 writer_tx
3066 .try_send(Frame::build(FrameType::Ping, control_flags(), 0, 1, Vec::new()).unwrap())
3067 .expect("prefill writer queue");
3068 let started = Instant::now();
3069
3070 let result = send_frame(
3071 &writer_tx,
3072 Frame::build(FrameType::Pong, control_flags(), 0, 2, Vec::new()).unwrap(),
3073 )
3074 .await;
3075
3076 assert!(matches!(result, Err(SubcError::WriterBackpressureTimeout)));
3077 assert!(
3078 started.elapsed() < Duration::from_secs(2),
3079 "control send guard should be bounded"
3080 );
3081 }
3082
3083 const CORE_TOOLS: [&str; 8] = [
3084 "status", "read", "grep", "search", "outline", "inspect", "edit", "write",
3085 ];
3086
3087 fn is_bare_placeholder_schema(schema: &Value) -> bool {
3088 schema == &json!({ "type": "object" })
3089 }
3090
3091 #[test]
3092 fn build_manifest_serves_embedded_tool_schemas() {
3093 let manifest = build_manifest();
3094 let tools = match manifest.provides.first() {
3095 Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3096 _ => panic!("expected ToolProvider"),
3097 };
3098 let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3099 for name in CORE_TOOLS {
3100 let tool = by_name
3101 .get(name)
3102 .unwrap_or_else(|| panic!("missing tool {name}"));
3103 assert!(
3104 !is_bare_placeholder_schema(&tool.schema),
3105 "{name} must not use bare placeholder schema"
3106 );
3107 assert_eq!(
3108 tool.schema.get("type").and_then(|v| v.as_str()),
3109 Some("object"),
3110 "{name} schema must be an object"
3111 );
3112 }
3113
3114 let read = by_name["read"]
3115 .schema
3116 .get("properties")
3117 .and_then(|p| p.as_object());
3118 let read_props = read.expect("read schema properties");
3119 assert!(
3120 read_props.contains_key("filePath"),
3121 "read schema must expose filePath"
3122 );
3123
3124 let status = &by_name["status"].schema;
3125 assert_eq!(
3126 status.get("properties").and_then(|v| v.as_object()),
3127 Some(&serde_json::Map::new()),
3128 "status schema must have empty properties"
3129 );
3130 assert_eq!(
3131 status.get("additionalProperties").and_then(|v| v.as_bool()),
3132 Some(false),
3133 "status schema must forbid additionalProperties"
3134 );
3135 }
3136
3137 #[test]
3138 fn build_manifest_classifies_execution_mode_by_observable_effect() {
3139 let manifest = build_manifest();
3140 let tools = match manifest.provides.first() {
3141 Some(ProviderRole::ToolProvider { tools, .. }) => tools,
3142 _ => panic!("expected ToolProvider"),
3143 };
3144 let by_name: HashMap<&str, &Tool> = tools.iter().map(|t| (t.name.as_str(), t)).collect();
3145
3146 for name in ["status", "read", "grep", "search", "outline", "inspect"] {
3149 assert_eq!(
3150 by_name[name].execution_mode,
3151 ExecutionMode::Pure,
3152 "{name} produces no observable side effect and must be Pure"
3153 );
3154 }
3155 for name in ["edit", "write"] {
3157 assert_eq!(
3158 by_name[name].execution_mode,
3159 ExecutionMode::Mutating,
3160 "{name} writes files and must be Mutating"
3161 );
3162 }
3163 }
3164}
3165
3166#[derive(Debug)]
3167pub enum SubcError {
3168 Runtime(std::io::Error),
3169 ConnectionFile {
3170 path: PathBuf,
3171 source: subc_transport::ConnectionFileError,
3172 },
3173 NoEndpoint {
3174 path: PathBuf,
3175 },
3176 InvalidEndpoint {
3177 path: PathBuf,
3178 endpoint: String,
3179 },
3180 Connect {
3181 endpoint: String,
3182 source: std::io::Error,
3183 },
3184 Auth {
3185 endpoint: String,
3186 source: subc_transport::AuthError,
3187 },
3188 FrameIo(subc_transport::FrameIoError),
3189 FrameBuild(subc_protocol::FrameBuildError),
3190 WriterClosed,
3191 WriterBackpressureTimeout,
3192 WriterJoin(tokio::task::JoinError),
3193 Json(serde_json::Error),
3194 ClosedBeforeHelloAck,
3195 HelloRejected {
3196 body: Option<ErrorBody>,
3197 },
3198 UnexpectedFrame {
3199 ty: FrameType,
3200 },
3201}
3202
3203impl fmt::Display for SubcError {
3204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3205 match self {
3206 Self::Runtime(e) => write!(f, "failed to build subc tokio runtime: {e}"),
3207 Self::ConnectionFile { path, source } => {
3208 write!(f, "failed to read subc connection file {path:?}: {source}")
3209 }
3210 Self::NoEndpoint { path } => {
3211 write!(f, "subc connection file {path:?} has no endpoints")
3212 }
3213 Self::InvalidEndpoint { path, endpoint } => {
3214 write!(
3215 f,
3216 "subc connection file {path:?} has invalid endpoint {endpoint}"
3217 )
3218 }
3219 Self::Connect { endpoint, source } => {
3220 write!(f, "failed to connect to subc endpoint {endpoint}: {source}")
3221 }
3222 Self::Auth { endpoint, source } => {
3223 write!(
3224 f,
3225 "failed to authenticate to subc endpoint {endpoint}: {source}"
3226 )
3227 }
3228 Self::FrameIo(e) => write!(f, "subc frame I/O error: {e}"),
3229 Self::FrameBuild(e) => write!(f, "subc frame build error: {e}"),
3230 Self::WriterClosed => write!(f, "subc writer task closed"),
3231 Self::WriterBackpressureTimeout => write!(
3232 f,
3233 "subc writer task stayed backpressured while sending a control frame"
3234 ),
3235 Self::WriterJoin(e) => write!(f, "subc writer task join error: {e}"),
3236 Self::Json(e) => write!(f, "subc JSON error: {e}"),
3237 Self::ClosedBeforeHelloAck => {
3238 write!(f, "subc daemon closed the connection before HelloAck")
3239 }
3240 Self::HelloRejected { body } => match body {
3241 Some(b) => write!(f, "subc rejected ModuleHello: {} ({})", b.code, b.message),
3242 None => write!(f, "subc rejected ModuleHello (unparseable error body)"),
3243 },
3244 Self::UnexpectedFrame { ty } => {
3245 write!(f, "subc sent unexpected frame in place of HelloAck: {ty:?}")
3246 }
3247 }
3248 }
3249}
3250
3251impl std::error::Error for SubcError {}