1use crate::agent_core::team::completion_routing::{
2 PendingRoutingRecord, ReviewAttemptDiagnostic, ReviewFailureClassification,
3 RoutingDeliveryStatus, TeamRoutingEnvelope,
4};
5use crate::agent_core::team::milestone::TeamMilestoneEvent;
6use crate::agent_core::team::milestone_delivery::{milestone_dedupe_key, milestone_is_public};
7use crate::agent_core::team::registry::TaskStatus;
8use crate::agent_core::team::session::{ChannelSendSourceKind, ChannelSendStatus, TeamSession};
9use crate::agent_core::{SessionRegistry, TurnExecutionContext};
10use crate::channel_registry::ChannelRegistry;
11use crate::config::{GatewayConfig, InteractionMode};
12use crate::delivery_resolver::resolve_delivery;
13use crate::protocol::{DashboardEvent, InboundMsg, MsgContent, MsgSource, OutboundMsg, SessionKey};
14use anyhow::Result;
15use std::sync::{Arc, OnceLock};
16use std::time::Duration;
17use tokio::sync::mpsc;
18
19fn default_channel_instance_for_scope(
20 cfg: &GatewayConfig,
21 channel_name: &str,
22 scope: &str,
23) -> Option<String> {
24 if !scope.starts_with("user:") {
25 return None;
26 }
27 match channel_name {
28 "lark" => cfg
29 .channels
30 .lark
31 .as_ref()
32 .map(|lark| lark.default_instance_id().to_string()),
33 _ => None,
34 }
35}
36
37async fn send_with_reply_fallback(
38 channel: &Arc<dyn crate::channels_internal::Channel>,
39 outbound: OutboundMsg,
40) -> (OutboundMsg, anyhow::Result<()>) {
41 match channel.send(&outbound).await {
42 Ok(()) => (outbound, Ok(())),
43 Err(error) if outbound.reply_to.is_some() => {
44 tracing::warn!(
45 channel = %channel.name(),
46 reply_to = outbound.reply_to.as_deref().unwrap_or(""),
47 error = %error,
48 "milestone reply_to send failed; retrying as direct scope send"
49 );
50 let mut fallback = outbound.clone();
51 fallback.reply_to = None;
52 match channel.send(&fallback).await {
53 Ok(()) => (fallback, Ok(())),
54 Err(fallback_error) => (fallback, Err(fallback_error)),
55 }
56 }
57 Err(error) => (outbound, Err(error)),
58 }
59}
60
61pub async fn wire_team_runtime(
62 registry: Arc<SessionRegistry>,
63 cfg: &GatewayConfig,
64 channel_map: Arc<ChannelRegistry>,
65 heartbeat_interval: Duration,
66) -> Result<()> {
67 use crate::agent_core::team::{
68 completion_routing::{RoutingDeliveryStatus, TeamNotifyRequest},
69 heartbeat::DispatchFn,
70 orchestrator::TeamOrchestrator,
71 registry::TaskRegistry,
72 session::{stable_team_id_for_session_key, TeamSession},
73 };
74
75 let (team_notify_tx, mut team_notify_rx) = mpsc::channel::<TeamNotifyRequest>(256);
76 let team_notify_tx_for_orch = team_notify_tx.clone();
77 let team_scopes = cfg.normalized_team_scopes();
78 let mut review_retry_orchestrators = Vec::new();
79 let cfg_for_delivery = Arc::new(cfg.clone());
80 tracing::info!(
81 count = team_scopes.len(),
82 "wire_team_runtime: team scopes found"
83 );
84
85 for team_scope in &team_scopes {
86 tracing::info!(scope = %team_scope.scope, name = ?team_scope.name, "wire_team_runtime: wiring team scope");
87 let Some(channel_name) = team_scope.mode.channel.clone() else {
88 tracing::error!(
89 scope = %team_scope.scope,
90 "team scope is missing mode.channel; skipping team runtime wiring for this scope"
91 );
92 continue;
93 };
94 let lead_channel_instance =
95 default_channel_instance_for_scope(cfg, &channel_name, &team_scope.scope);
96 let lead_key = crate::protocol::SessionKey {
97 channel: channel_name.clone(),
98 channel_instance: lead_channel_instance.clone(),
99 scope: team_scope.scope.clone(),
100 };
101 let team_id = stable_team_id_for_session_key(&lead_key);
102 let session = match TeamSession::new(&team_scope.scope, &team_id) {
103 Ok(s) => Arc::new(s),
104 Err(e) => {
105 tracing::error!(scope = %team_scope.scope, "Failed to create TeamSession: {e:#}");
106 continue;
107 }
108 };
109 let db_path = session.dir.join("tasks.db");
110 let task_registry = match TaskRegistry::new(db_path.to_str().unwrap_or(":memory:")) {
111 Ok(r) => Arc::new(r),
112 Err(e) => {
113 tracing::error!(scope = %team_scope.scope, "Failed to open TaskRegistry: {e:#}");
114 continue;
115 }
116 };
117 let registry_for_dispatch = Arc::clone(®istry);
118 let registry_for_milestone = Arc::clone(®istry);
119 let task_reg_for_dispatch = Arc::clone(&task_registry);
120 let team_session_for_dispatch = Arc::clone(&session);
121 let dispatch_requester_key = lead_key.clone();
122 let team_orch_for_dispatch: Arc<OnceLock<Arc<TeamOrchestrator>>> =
123 Arc::new(OnceLock::new());
124 let team_orch_for_dispatch_in_closure = Arc::clone(&team_orch_for_dispatch);
125 let team_orch_for_milestone: Arc<OnceLock<Arc<TeamOrchestrator>>> =
126 Arc::new(OnceLock::new());
127 let dispatch_fn: DispatchFn = Arc::new(move |agent: String, task| {
128 let registry = Arc::clone(®istry_for_dispatch);
129 let task_reg = Arc::clone(&task_reg_for_dispatch);
130 let team_session = Arc::clone(&team_session_for_dispatch);
131 let requester_key = dispatch_requester_key.clone();
132 let team_orch_cell = Arc::clone(&team_orch_for_dispatch_in_closure);
133 Box::pin(async move {
134 let specialist_key = team_session.specialist_session_key(&agent);
135 let specialist_channel = specialist_key.channel.clone();
136 let reminder = team_session.build_task_reminder(&task, &task_reg);
137 registry.set_task_reminder(specialist_key.clone(), reminder);
138 let dispatch_started_at = chrono::Utc::now();
139 if let Some(team_orch) = team_orch_cell.get() {
140 team_orch.record_dispatch_start(
141 &task.id,
142 &agent,
143 requester_key.clone(),
144 None,
145 team_orch.lead_delivery_source(),
146 );
147 }
148 let msg = crate::protocol::InboundMsg {
149 id: uuid::Uuid::new_v4().to_string(),
150 session_key: specialist_key,
151 content: crate::protocol::MsgContent::text(
152 team_session.build_task_dispatch_message(&task),
153 ),
154 sender: "orchestrator".to_string(),
155 channel: specialist_channel,
156 timestamp: chrono::Utc::now(),
157 thread_ts: None,
158 target_agent: Some(format!("@{}", agent)),
159 source: crate::protocol::MsgSource::Heartbeat,
160 };
161 let result = registry
162 .handle_with_context(msg, TurnExecutionContext::default())
163 .await;
164 let specialist_session_id = registry
165 .session_manager_ref()
166 .get_or_create(&team_session.specialist_session_key(&agent))
167 .await?;
168 let captured_reply_text = capture_specialist_reply_text(
169 registry.session_manager_ref().storage().as_ref(),
170 specialist_session_id,
171 &result,
172 )
173 .await
174 .unwrap_or_else(|error| {
175 tracing::warn!(
176 error = %error,
177 task_id = %task.id,
178 agent = %agent,
179 "failed to capture specialist reply text for team diagnostics"
180 );
181 None
182 });
183 let reply_excerpt =
184 missing_completion_excerpt(&result, captured_reply_text.as_deref());
185 if let Some(ref reply_text) = captured_reply_text {
186 let _ = team_session.append_specialist_reply(&agent, &task.id, reply_text);
187 }
188 if let Some(team_orch) = team_orch_cell.get() {
189 let outcome =
190 team_orch.classify_specialist_turn(&task.id, &agent, dispatch_started_at);
191 if matches!(
192 outcome,
193 crate::agent_core::team::specialist_turn::SpecialistTurnOutcome::MissingCompletion
194 ) {
195 if let Some(ref reply_text) = captured_reply_text {
196 let _ = persist_missing_completion_reply_artifacts(
197 team_session.as_ref(),
198 &task.id,
199 &agent,
200 reply_text,
201 );
202 }
203 team_orch.handle_specialist_missing_completion(
204 &task.id,
205 &agent,
206 reply_excerpt.as_deref(),
207 )?;
208 registry
209 .session_manager_ref()
210 .reset_conversation(specialist_session_id)
211 .await?;
212 }
213 }
214 if result.is_ok() {
215 if let Some(team_orch) = team_orch_cell.get() {
216 team_orch.notify_task_dispatched(&task.id, &task.title, &agent);
217 }
218 }
219 result.map(|_| ())
220 })
221 });
222
223 let team_orch = TeamOrchestrator::new(
224 task_registry,
225 Arc::clone(&session),
226 dispatch_fn,
227 heartbeat_interval,
228 );
229 let _ = team_orch_for_dispatch.set(Arc::clone(&team_orch));
230 let _ = team_orch_for_milestone.set(Arc::clone(&team_orch));
231
232 let channels_for_notify = Arc::clone(&channel_map);
233 let session_for_notify = Arc::clone(&session);
234 let public_updates_mode = team_scope.team.public_updates;
235 let lead_agent_name_for_notify = team_scope.mode.front_bot.clone();
236 let cfg_for_milestone = Arc::clone(&cfg_for_delivery);
237 let team_orch_for_milestone_in_closure = Arc::clone(&team_orch_for_milestone);
238 team_orch.set_milestone_fn(Arc::new(
239 move |scope: crate::protocol::SessionKey, event| {
240 use crate::agent_core::team::milestone::render_for_im;
241 if !milestone_is_public(&event, public_updates_mode) {
242 tracing::debug!(
243 scope = %scope.scope,
244 kind = %event.kind_str(),
245 "Suppressing internal-only team milestone from direct channel delivery"
246 );
247 return;
248 }
249 if let Some(dedupe_key) = milestone_dedupe_key(&event) {
250 match session_for_notify.mark_delivery_dedupe(&scope.scope, &dedupe_key) {
251 Ok(true) => {}
252 Ok(false) => {
253 let _ = session_for_notify
254 .record_delivery_dedupe_hit(&scope.scope, &dedupe_key);
255 tracing::debug!(
256 scope = %scope.scope,
257 kind = %event.kind_str(),
258 "Suppressing duplicate team milestone channel delivery"
259 );
260 return;
261 }
262 Err(err) => {
263 tracing::warn!(
264 scope = %scope.scope,
265 kind = %event.kind_str(),
266 error = %err,
267 "Failed to persist milestone delivery dedupe key"
268 );
269 }
270 }
271 }
272 let msg = render_for_im(&event);
273 let channels = Arc::clone(&channels_for_notify);
274 let session_for_record = Arc::clone(&session_for_notify);
275 let lead_agent_name = lead_agent_name_for_notify.clone();
276 let dedupe_key_for_record = milestone_dedupe_key(&event);
277 let cfg = Arc::clone(&cfg_for_milestone);
278 let team_orch_cell = Arc::clone(&team_orch_for_milestone_in_closure);
279 let registry_for_send = Arc::clone(®istry_for_milestone);
280 tokio::spawn(async move {
281 let stored_source = team_orch_cell
282 .get()
283 .and_then(|team_orch| team_orch.lead_delivery_source());
284 let (source_kind, source_agent) =
285 milestone_channel_origin(&event, lead_agent_name.as_deref());
286 let resolved = resolve_delivery(
287 cfg.as_ref(),
288 channels.as_ref(),
289 milestone_delivery_purpose(&event),
290 &scope,
291 None,
292 stored_source.as_ref(),
293 Some(&source_agent),
294 None,
295 None,
296 );
297 let (outbound, send_result, sender_channel_instance) = if let Some(resolved) =
298 resolved
299 {
300 let sender_channel_instance = resolved.sender_channel_instance.clone();
301 let outbound = resolved.outbound_text(&msg);
302 let (outbound, send_result) =
303 send_with_reply_fallback(&resolved.sender, outbound).await;
304 (outbound, send_result, sender_channel_instance)
305 } else if let Some(ch) = channels.resolve_for_session(&scope) {
306 let outbound = crate::protocol::OutboundMsg {
307 session_key: scope.clone(),
308 content: crate::protocol::MsgContent::text(msg),
309 reply_to: stored_source
310 .as_ref()
311 .and_then(|source| source.reply_to.clone()),
312 thread_ts: stored_source
313 .as_ref()
314 .and_then(|source| source.thread_ts.clone()),
315 };
316 let (outbound, send_result) = send_with_reply_fallback(&ch, outbound).await;
317 (outbound, send_result, None)
318 } else {
319 return;
320 };
321 if let Err(e) = &send_result {
322 tracing::error!("Milestone notify send error: {e}");
323 }
324 let (status, error) = match send_result {
325 Ok(()) => (ChannelSendStatus::Sent, None),
326 Err(err) => (ChannelSendStatus::SendFailed, Some(err.to_string())),
327 };
328 match session_for_record.record_channel_send(
329 &outbound.session_key.channel,
330 sender_channel_instance.as_deref(),
331 outbound.session_key.channel_instance.as_deref(),
332 &outbound.session_key.scope,
333 None,
334 stored_source.as_ref(),
335 outbound.reply_to.as_deref(),
336 outbound.thread_ts.as_deref(),
337 source_kind,
338 &source_agent,
339 milestone_task_id(&event),
340 dedupe_key_for_record.as_deref(),
341 outbound.content.as_text().unwrap_or_default(),
342 status,
343 error.as_deref(),
344 ) {
345 Ok(record) => registry_for_send.emit_dashboard_event(
346 DashboardEvent::TeamChannelSend {
347 team_id: session_for_record.team_id.clone(),
348 record,
349 },
350 ),
351 Err(err) => {
352 tracing::warn!(
353 team_id = %session_for_record.team_id,
354 error = %err,
355 "Failed to append milestone channel send ledger entry"
356 );
357 }
358 }
359 });
360 },
361 ));
362
363 team_orch.set_lead_session_key(lead_key.clone());
364 team_orch.set_scope(lead_key);
365 if let Some(front_bot) = &team_scope.mode.front_bot {
366 team_orch.set_lead_agent_name(front_bot.clone());
367 tracing::info!(front_bot = %front_bot, scope = %team_scope.scope, "Lead agent wired from front_bot");
368 }
369 if !team_scope.team.roster.is_empty() {
370 team_orch.set_available_specialists(team_scope.team.roster.clone());
371 tracing::info!(specialists = ?team_scope.team.roster, scope = %team_scope.scope, "Available specialists wired");
372 }
373 team_orch.set_max_parallel(team_scope.team.max_parallel);
374 tracing::info!(
375 scope = %team_scope.scope,
376 max_parallel = team_scope.team.max_parallel,
377 "team dispatch limit wired"
378 );
379
380 team_orch.set_team_notify_tx(team_notify_tx_for_orch.clone());
381
382 team_orch.bootstrap_workspace_artifacts().map_err(|e| {
383 anyhow::anyhow!(
384 "failed to bootstrap team workspace for scope '{}' (team '{}'): {e:#}",
385 team_scope.scope,
386 team_id
387 )
388 })?;
389 registry.register_team_orchestrator(team_id.clone(), team_orch);
390 review_retry_orchestrators.push(
391 registry
392 .get_team_orchestrator(&team_id)
393 .expect("team orchestrator should be immediately retrievable after registration"),
394 );
395 tracing::info!(scope = %team_scope.scope, team_id = %team_id, "TeamOrchestrator registered");
396 }
397
398 if !review_retry_orchestrators.is_empty() {
399 tokio::spawn(async move {
400 let mut interval = tokio::time::interval(Duration::from_secs(5));
401 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
402 loop {
403 interval.tick().await;
404 for team_orch in &review_retry_orchestrators {
405 team_orch.retry_due_pending_routing_events();
406 }
407 }
408 });
409 tracing::info!("Team review retry task started");
410 }
411
412 for group in cfg.groups.iter().filter(|g| g.mode.auto_promote) {
413 registry.add_auto_promote_scope(group.scope.clone());
414 tracing::info!(scope = %group.scope, "auto_promote keyword detection enabled");
415 }
416
417 for group in cfg
418 .groups
419 .iter()
420 .filter(|group| !matches!(group.mode.interaction, InteractionMode::Team))
421 {
422 if let Some(front_bot) = &group.mode.front_bot {
423 registry.register_scope_binding_with_channel(
424 group.mode.channel.clone(),
425 group.scope.clone(),
426 front_bot.clone(),
427 );
428 tracing::info!(scope = %group.scope, front_bot = %front_bot, "scope binding registered");
429 }
430 }
431
432 for team_scope in &team_scopes {
433 if let Some(front_bot) = &team_scope.mode.front_bot {
434 registry.register_scope_binding_with_channel(
435 team_scope.mode.channel.clone(),
436 team_scope.scope.clone(),
437 front_bot.clone(),
438 );
439 tracing::info!(scope = %team_scope.scope, front_bot = %front_bot, "team scope binding registered");
440 }
441 }
442
443 for binding in &cfg.bindings {
444 registry.register_binding(binding.to_binding_rule());
445 tracing::info!(agent = %binding.agent_name(), kind = ?binding, "routing binding registered");
446 }
447
448 {
449 let registry_for_notify = Arc::clone(®istry);
450 tokio::spawn(async move {
451 while let Some(request) = team_notify_rx.recv().await {
452 let base_record = request.clone().into_pending_record();
453 if let Some(team_orch) =
454 registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
455 {
456 if routing_event_is_stale_for_delivery(team_orch.as_ref(), &request.envelope) {
457 tracing::info!(
458 team_id = %request.envelope.team_id,
459 task_id = %request.envelope.event.task_id,
460 kind = ?request.envelope.event.kind,
461 "Suppressing stale team routing event before lead delivery"
462 );
463 team_orch.mark_routing_event_delivered(
464 &request
465 .envelope
466 .clone()
467 .with_delivery_status(RoutingDeliveryStatus::DirectDelivered),
468 );
469 continue;
470 }
471 }
472 let text = render_routing_event_for_delivery(&base_record);
473 let mut delivered = None;
474 let mut pending_record: Option<PendingRoutingRecord> = None;
475
476 for (attempt_index, target) in routing_attempt_targets(&request.envelope)
477 .into_iter()
478 .enumerate()
479 {
480 let busy = registry_for_notify.is_session_busy(&target);
481 let turn_ctx = team_notify_turn_context(&request.envelope, &target);
482 let inbound = team_notify_inbound(
483 &target,
484 &text,
485 turn_ctx
486 .delivery_source
487 .as_ref()
488 .and_then(|source| source.thread_ts.clone()),
489 );
490 if let Some(team_orch) =
491 registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
492 {
493 if base_record.review.is_some() {
494 team_orch.begin_lead_review_attempt(
495 &request.envelope.run_id,
496 &request.envelope.event.task_id,
497 request.envelope.event.kind.clone(),
498 );
499 }
500 }
501 let delivery_result = registry_for_notify
502 .handle_with_context(inbound, turn_ctx)
503 .await;
504 if let Some(team_orch) =
505 registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
506 {
507 if base_record.review.is_some() {
508 team_orch.end_lead_review_attempt(&request.envelope.run_id);
509 }
510 }
511 match delivery_result {
512 Ok(result_text) => {
513 if let Some(team_orch) =
514 registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
515 {
516 if routing_event_still_requires_resolution_after_delivery(
517 team_orch.as_ref(),
518 &request.envelope,
519 base_record.review.as_ref(),
520 ) {
521 let (classification, reason) =
522 unresolved_review_failure_from_turn_result(
523 result_text.as_deref(),
524 &request.envelope,
525 );
526 let record = base_record
527 .clone()
528 .with_delivery_status(
529 RoutingDeliveryStatus::PersistedPending,
530 )
531 .note_failed_attempt(
532 classification,
533 reason,
534 Some(next_review_retry_at(
535 base_record
536 .review
537 .as_ref()
538 .map(|review| review.attempt_count + 1)
539 .unwrap_or(1),
540 )),
541 );
542 if let Some(diagnostic) = review_attempt_diagnostic(&record) {
543 let _ = team_orch
544 .session
545 .append_review_attempt_diagnostic(&diagnostic);
546 }
547 pending_record = Some(record);
548 tracing::warn!(
549 team_id = %request.envelope.team_id,
550 task_id = %request.envelope.event.task_id,
551 kind = ?request.envelope.event.kind,
552 target = %target.scope,
553 "TeamNotify turn completed without resolving required team action; treating delivery as incomplete"
554 );
555 continue;
556 }
557 }
558 delivered = Some(request.envelope.clone().with_delivery_status(
559 delivery_status_for_attempt(attempt_index, busy),
560 ));
561 break;
562 }
563 Err(e) => {
564 let requester_scope = request
565 .envelope
566 .requester_session_key
567 .as_ref()
568 .map(|key| key.scope.as_str())
569 .unwrap_or("<none>");
570 tracing::warn!(
571 team_id = %request.envelope.team_id,
572 requester = %requester_scope,
573 target = %target.scope,
574 attempt_index,
575 "TeamNotify delivery attempt failed: {e}"
576 );
577 if let Some(team_orch) =
578 registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
579 {
580 let record = base_record
581 .clone()
582 .with_delivery_status(RoutingDeliveryStatus::PersistedPending)
583 .note_failed_attempt(
584 ReviewFailureClassification::RuntimeError,
585 e.to_string(),
586 Some(next_review_retry_at(
587 base_record
588 .review
589 .as_ref()
590 .map(|review| review.attempt_count + 1)
591 .unwrap_or(1),
592 )),
593 );
594 if let Some(diagnostic) = review_attempt_diagnostic(&record) {
595 let _ = team_orch
596 .session
597 .append_review_attempt_diagnostic(&diagnostic);
598 }
599 pending_record = Some(record);
600 }
601 }
602 }
603 }
604
605 if let Some(team_orch) =
606 registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
607 {
608 if let Some(delivered) = delivered {
609 team_orch.mark_routing_event_delivered(&delivered);
610 } else {
611 let pending = pending_record.unwrap_or_else(|| {
612 base_record
613 .clone()
614 .with_delivery_status(RoutingDeliveryStatus::PersistedPending)
615 });
616 team_orch.persist_pending_routing_record(pending);
617 }
618 }
619 }
620 });
621 tracing::info!("TeamNotify redispatch task started");
622 }
623
624 Ok(())
625}
626
627fn milestone_channel_origin(
628 event: &TeamMilestoneEvent,
629 lead_agent_name: Option<&str>,
630) -> (ChannelSendSourceKind, String) {
631 match event {
632 TeamMilestoneEvent::LeadMessage { .. } => (
633 ChannelSendSourceKind::LeadText,
634 lead_agent_name.unwrap_or("leader").to_string(),
635 ),
636 TeamMilestoneEvent::TaskDispatched { agent, .. }
637 | TeamMilestoneEvent::TaskCheckpoint { agent, .. }
638 | TeamMilestoneEvent::TaskSubmitted { agent, .. }
639 | TeamMilestoneEvent::TaskBlocked { agent, .. }
640 | TeamMilestoneEvent::TaskDone { agent, .. } => {
641 (ChannelSendSourceKind::Milestone, agent.clone())
642 }
643 TeamMilestoneEvent::TaskFailed { agent, .. } => {
644 (ChannelSendSourceKind::Milestone, agent.clone())
645 }
646 TeamMilestoneEvent::TasksUnlocked { .. } | TeamMilestoneEvent::AllTasksDone => {
647 (ChannelSendSourceKind::Milestone, "team-runtime".to_string())
648 }
649 }
650}
651
652fn milestone_delivery_purpose(event: &TeamMilestoneEvent) -> crate::config::DeliveryPurposeConfig {
653 match event {
654 TeamMilestoneEvent::LeadMessage { .. } => crate::config::DeliveryPurposeConfig::LeadMessage,
655 _ => crate::config::DeliveryPurposeConfig::Milestone,
656 }
657}
658
659fn milestone_task_id(event: &TeamMilestoneEvent) -> Option<&str> {
660 match event {
661 TeamMilestoneEvent::TaskDispatched { task_id, .. }
662 | TeamMilestoneEvent::TaskCheckpoint { task_id, .. }
663 | TeamMilestoneEvent::TaskSubmitted { task_id, .. }
664 | TeamMilestoneEvent::TaskBlocked { task_id, .. }
665 | TeamMilestoneEvent::TaskFailed { task_id, .. }
666 | TeamMilestoneEvent::TaskDone { task_id, .. } => Some(task_id.as_str()),
667 TeamMilestoneEvent::TasksUnlocked { .. }
668 | TeamMilestoneEvent::AllTasksDone
669 | TeamMilestoneEvent::LeadMessage { .. } => None,
670 }
671}
672
673fn truncate_for_missing_completion(text: &str, max_chars: usize) -> String {
674 let mut truncated = text.chars().take(max_chars).collect::<String>();
675 if text.chars().count() > max_chars {
676 truncated.push_str("...");
677 }
678 truncated
679}
680
681async fn capture_specialist_reply_text(
682 storage: &crate::session::SessionStorage,
683 specialist_session_id: uuid::Uuid,
684 result: &anyhow::Result<Option<String>>,
685) -> Result<Option<String>> {
686 if let Ok(Some(reply_text)) = result {
687 if !reply_text.trim().is_empty() {
688 return Ok(Some(reply_text.clone()));
689 }
690 }
691
692 let recent = storage
693 .load_recent_messages(specialist_session_id, 20)
694 .await
695 .unwrap_or_default();
696 Ok(recent
697 .iter()
698 .rev()
699 .find(|msg| msg.role == "assistant" && !msg.content.trim().is_empty())
700 .map(|msg| msg.content.clone()))
701}
702
703fn missing_completion_excerpt(
704 result: &anyhow::Result<Option<String>>,
705 captured_reply_text: Option<&str>,
706) -> Option<String> {
707 if let Some(reply_text) = captured_reply_text {
708 return Some(truncate_for_missing_completion(reply_text, 240));
709 }
710 match result {
711 Err(error) => Some(truncate_for_missing_completion(
712 &format!("runtime error: {error}"),
713 240,
714 )),
715 Ok(None) => Some(
718 "zero-output turn: backend returned no text (possible cold-start, subprocess \
719 initialization failure, or MCP bridge unavailable)"
720 .to_string(),
721 ),
722 Ok(Some(text)) if text.trim().is_empty() => Some(
723 "zero-output turn: backend returned empty text (possible cold-start, subprocess \
724 initialization failure, or MCP bridge unavailable)"
725 .to_string(),
726 ),
727 Ok(Some(_)) => None,
728 }
729}
730
731fn persist_missing_completion_reply_artifacts(
732 team_session: &TeamSession,
733 task_id: &str,
734 agent: &str,
735 reply_text: &str,
736) -> Result<()> {
737 let result_path = team_session.task_dir(task_id).join("result.md");
738 let existing = std::fs::read_to_string(&result_path).unwrap_or_default();
739 if existing.trim().is_empty() {
740 team_session.write_task_result(
741 task_id,
742 &format!(
743 "# Draft Specialist Output\n\nThis task ended without a canonical team completion tool call.\nThe raw assistant reply from `{agent}` was preserved below for review/retry.\n\n---\n\n{reply_text}\n"
744 ),
745 )?;
746 }
747 team_session.append_task_progress(
748 task_id,
749 &format!(
750 "[{}] preserved raw reply text from {} before specialist session reset.",
751 chrono::Utc::now().to_rfc3339(),
752 agent
753 ),
754 )?;
755 Ok(())
756}
757
758fn routing_attempt_targets(envelope: &TeamRoutingEnvelope) -> Vec<SessionKey> {
759 let mut targets = Vec::with_capacity(1 + envelope.fallback_session_keys.len());
760 if let Some(requester) = &envelope.requester_session_key {
761 targets.push(requester.clone());
762 }
763 for key in &envelope.fallback_session_keys {
764 if !targets.contains(key) {
765 targets.push(key.clone());
766 }
767 }
768 targets
769}
770
771fn delivery_status_for_attempt(attempt_index: usize, busy: bool) -> RoutingDeliveryStatus {
772 if attempt_index > 0 {
773 RoutingDeliveryStatus::FallbackRedirected
774 } else if busy {
775 RoutingDeliveryStatus::QueuedDelivered
776 } else {
777 RoutingDeliveryStatus::DirectDelivered
778 }
779}
780
781fn team_notify_inbound(target: &SessionKey, text: &str, thread_ts: Option<String>) -> InboundMsg {
782 InboundMsg {
783 id: uuid::Uuid::new_v4().to_string(),
784 session_key: target.clone(),
785 content: MsgContent::text(text),
786 sender: "gateway".to_string(),
787 channel: target.channel.clone(),
788 timestamp: chrono::Utc::now(),
789 thread_ts,
790 target_agent: None,
791 source: MsgSource::TeamNotify,
792 }
793}
794
795fn team_notify_turn_context(
796 envelope: &TeamRoutingEnvelope,
797 target: &SessionKey,
798) -> TurnExecutionContext {
799 let delivery_source = envelope
800 .delivery_source
801 .as_ref()
802 .filter(|source| source.session_key() == *target)
803 .cloned();
804 TurnExecutionContext { delivery_source }
805}
806
807fn routing_event_is_stale_for_delivery(
808 team_orch: &crate::agent_core::team::orchestrator::TeamOrchestrator,
809 envelope: &TeamRoutingEnvelope,
810) -> bool {
811 let task = match team_orch.registry.get_task(&envelope.event.task_id) {
812 Ok(Some(task)) => task,
813 Ok(None) | Err(_) => return false,
814 };
815
816 match envelope.event.kind {
817 crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskCheckpoint => {
818 matches!(
819 task.status_parsed(),
820 TaskStatus::Submitted { .. }
821 | TaskStatus::Accepted { .. }
822 | TaskStatus::Done
823 | TaskStatus::Failed(_)
824 )
825 }
826 crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskSubmitted => {
827 matches!(
828 task.status_parsed(),
829 TaskStatus::Accepted { .. } | TaskStatus::Done | TaskStatus::Failed(_)
830 )
831 }
832 _ => false,
833 }
834}
835
836fn render_routing_event_for_delivery(record: &PendingRoutingRecord) -> String {
837 let mut rendered = record.envelope.event.render_for_parent();
838 let Some(review) = record.review.as_ref() else {
839 return rendered;
840 };
841 if review.attempt_count == 0 {
842 return rendered;
843 }
844
845 let failure_label = review
846 .last_failure_classification
847 .map(review_failure_label)
848 .unwrap_or("未分类失败");
849 let failure_reason = review
850 .last_failure_reason
851 .as_deref()
852 .unwrap_or("上一轮未留下明确失败原因");
853 let corrective_contract =
854 review_retry_corrective_contract(review.review_kind, &record.envelope.event.task_id);
855
856 rendered = format!(
857 "[系统纠偏提醒]\n此前同一控制面事件已失败 {attempts} 次。\n最近一次失败分类:{failure_label}\n最近一次失败原因:{failure_reason}\n\n{corrective_contract}\n\n{rendered}",
858 attempts = review.attempt_count,
859 );
860 rendered
861}
862
863fn review_failure_label(classification: ReviewFailureClassification) -> &'static str {
864 match classification {
865 ReviewFailureClassification::NoOp => "NoOp",
866 ReviewFailureClassification::RuntimeError => "RuntimeError",
867 ReviewFailureClassification::DeliveryFailure => "DeliveryFailure",
868 ReviewFailureClassification::StillRequiresResolution => "StillRequiresResolution",
869 }
870}
871
872fn review_retry_corrective_contract(
873 review_kind: crate::agent_core::team::completion_routing::ReviewRequiredKind,
874 task_id: &str,
875) -> String {
876 match review_kind {
877 crate::agent_core::team::completion_routing::ReviewRequiredKind::Submitted => format!(
878 "这是 submitted 验收纠偏回合,不是新的用户请求。不要再输出解释性文字作为结束。先检查结果工件,然后在本轮结束前恰好调用一个工具:accept_task(task_id=\"{task_id}\") 或 reopen_task(task_id=\"{task_id}\", reason=\"...\")。"
879 ),
880 crate::agent_core::team::completion_routing::ReviewRequiredKind::Blocked => format!(
881 "这是 blocked 处理纠偏回合,不是新的用户请求。你必须明确处理 {task_id}:优先用内部动作继续推进;只有在确实需要用户决策时,才调用 post_update(...) 向用户说明阻塞并请求决策。"
882 ),
883 crate::agent_core::team::completion_routing::ReviewRequiredKind::Failed => format!(
884 "这是 failed 处理纠偏回合,不是新的用户请求。你必须明确处理 {task_id}:若要交还用户决策,调用 post_update(...) 说明失败、原因以及“重试 / 终止 / 改派”的选择;不要静默结束。"
885 ),
886 crate::agent_core::team::completion_routing::ReviewRequiredKind::MissingCompletion => format!(
887 "这是 missing-completion 纠偏回合,不是新的用户请求。你必须对 {task_id} 采取内部动作继续推进,例如 reopen_task(...) 或 assign_task(...);仅 post_update(...) 不能算完成。"
888 ),
889 }
890}
891
892fn routing_event_still_requires_resolution_after_delivery(
893 team_orch: &crate::agent_core::team::orchestrator::TeamOrchestrator,
894 envelope: &TeamRoutingEnvelope,
895 review: Option<&crate::agent_core::team::completion_routing::ReviewAttemptMetadata>,
896) -> bool {
897 let task = match team_orch.registry.get_task(&envelope.event.task_id) {
898 Ok(Some(task)) => task,
899 Ok(None) | Err(_) => return false,
900 };
901
902 match envelope.event.kind {
903 crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskSubmitted => {
904 matches!(task.status_parsed(), TaskStatus::Submitted { .. })
905 }
906 crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskBlocked
907 | crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskMissingCompletion => {
908 let still_held = matches!(task.status_parsed(), TaskStatus::Held { .. });
909 if !still_held {
910 return false;
911 }
912 if matches!(
913 envelope.event.kind,
914 crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskBlocked
915 ) && review_terminal_post_update_recorded(team_orch, &envelope.event.task_id, review)
916 {
917 return false;
918 }
919 true
920 }
921 crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskFailed => {
922 !review_terminal_post_update_recorded(team_orch, &envelope.event.task_id, review)
923 }
924 _ => false,
925 }
926}
927
928fn review_terminal_post_update_recorded(
929 team_orch: &crate::agent_core::team::orchestrator::TeamOrchestrator,
930 task_id: &str,
931 review: Option<&crate::agent_core::team::completion_routing::ReviewAttemptMetadata>,
932) -> bool {
933 let Some(review) = review else {
934 return false;
935 };
936 team_orch
937 .session
938 .has_post_update_for_task_since(task_id, &review.first_pending_at)
939 .unwrap_or(false)
940}
941
942fn unresolved_review_failure_from_turn_result(
943 result_text: Option<&str>,
944 envelope: &TeamRoutingEnvelope,
945) -> (ReviewFailureClassification, String) {
946 match result_text {
947 None => (
948 ReviewFailureClassification::NoOp,
949 format!(
950 "lead review turn for {:?} produced no output and did not resolve the task state",
951 envelope.event.kind
952 ),
953 ),
954 Some(text) if text.trim().is_empty() => (
955 ReviewFailureClassification::NoOp,
956 format!(
957 "lead review turn for {:?} produced empty output and did not resolve the task state",
958 envelope.event.kind
959 ),
960 ),
961 Some(text) => (
962 ReviewFailureClassification::StillRequiresResolution,
963 format!(
964 "lead review turn for {:?} produced output without resolving the task state: {}",
965 envelope.event.kind,
966 truncate_for_missing_completion(text, 160)
967 ),
968 ),
969 }
970}
971
972fn next_review_retry_at(attempt_count: u32) -> String {
973 let backoff_seconds = match attempt_count {
974 0 | 1 => 5,
975 2 => 15,
976 3 => 30,
977 _ => 60,
978 };
979 (chrono::Utc::now() + chrono::Duration::seconds(backoff_seconds)).to_rfc3339()
980}
981
982fn review_attempt_diagnostic(record: &PendingRoutingRecord) -> Option<ReviewAttemptDiagnostic> {
983 let review = record.review.as_ref()?;
984 let classification = review.last_failure_classification?;
985 let reason = review.last_failure_reason.clone()?;
986 Some(ReviewAttemptDiagnostic {
987 ts: chrono::Utc::now().to_rfc3339(),
988 run_id: record.envelope.run_id.clone(),
989 team_id: record.envelope.team_id.clone(),
990 task_id: record.envelope.event.task_id.clone(),
991 event_kind: record.envelope.event.kind.clone(),
992 attempt_count: review.attempt_count,
993 classification,
994 reason,
995 })
996}
997
998#[cfg(test)]
999mod tests {
1000 use super::*;
1001 use crate::agent_core::team::completion_routing::{RoutingDeliveryStatus, TeamRoutingEvent};
1002 use crate::agent_core::team::milestone::TeamMilestoneEvent;
1003 use crate::agent_core::team::milestone_delivery::{milestone_is_public, TeamPublicUpdatesMode};
1004 use crate::agent_core::team::orchestrator::TeamOrchestrator;
1005 use crate::agent_core::team::registry::{CreateTask, TaskRegistry};
1006 use crate::agent_core::team::session::{stable_team_id_for_session_key, TeamSession};
1007 use crate::config::{ChannelsSection, GatewayConfig, LarkSection, ProgressPresentationMode};
1008 use anyhow::Result;
1009 use async_trait::async_trait;
1010 use std::sync::Arc;
1011 use std::sync::Mutex;
1012 use tempfile::tempdir;
1013 use tokio::sync::mpsc;
1014 use uuid::Uuid;
1015
1016 struct MockChannel {
1017 sent: Mutex<Vec<OutboundMsg>>,
1018 fail_when_reply_to: bool,
1019 }
1020
1021 #[async_trait]
1022 impl crate::channels_internal::Channel for MockChannel {
1023 fn name(&self) -> &str {
1024 "mock"
1025 }
1026
1027 async fn send(&self, msg: &OutboundMsg) -> Result<()> {
1028 if self.fail_when_reply_to && msg.reply_to.is_some() {
1029 anyhow::bail!("reply target not found");
1030 }
1031 self.sent.lock().unwrap().push(msg.clone());
1032 Ok(())
1033 }
1034
1035 async fn listen(&self, _tx: mpsc::Sender<InboundMsg>) -> Result<()> {
1036 Ok(())
1037 }
1038 }
1039
1040 #[test]
1041 fn routing_attempt_targets_dedupes_requester_and_fallbacks() {
1042 let requester = SessionKey::new("ws", "group:req");
1043 let lead = SessionKey::new("ws", "group:lead");
1044 let envelope = TeamRoutingEnvelope {
1045 run_id: "run-1".into(),
1046 parent_run_id: None,
1047 requester_session_key: Some(requester.clone()),
1048 fallback_session_keys: vec![requester.clone(), lead.clone(), lead.clone()],
1049 team_id: "team-1".into(),
1050 delivery_status: RoutingDeliveryStatus::NotRouted,
1051 event: TeamRoutingEvent::failed("T001", "boom"),
1052 delivery_source: None,
1053 };
1054
1055 let targets = routing_attempt_targets(&envelope);
1056 assert_eq!(targets, vec![requester, lead]);
1057 }
1058
1059 #[test]
1060 fn routing_attempt_targets_allow_fallback_only_delivery() {
1061 let lead = SessionKey::new("ws", "group:lead");
1062 let envelope = TeamRoutingEnvelope {
1063 run_id: "run-1".into(),
1064 parent_run_id: None,
1065 requester_session_key: None,
1066 fallback_session_keys: vec![lead.clone()],
1067 team_id: "team-1".into(),
1068 delivery_status: RoutingDeliveryStatus::NotRouted,
1069 event: TeamRoutingEvent::failed("T001", "boom"),
1070 delivery_source: None,
1071 };
1072
1073 let targets = routing_attempt_targets(&envelope);
1074 assert_eq!(targets, vec![lead]);
1075 }
1076
1077 #[test]
1078 fn delivery_status_uses_fallback_redirected_for_secondary_target() {
1079 assert_eq!(
1080 delivery_status_for_attempt(0, false),
1081 RoutingDeliveryStatus::DirectDelivered
1082 );
1083 assert_eq!(
1084 delivery_status_for_attempt(0, true),
1085 RoutingDeliveryStatus::QueuedDelivered
1086 );
1087 assert_eq!(
1088 delivery_status_for_attempt(1, false),
1089 RoutingDeliveryStatus::FallbackRedirected
1090 );
1091 }
1092
1093 #[test]
1094 fn stale_checkpoint_is_suppressed_after_submission() {
1095 let tmp = tempdir().unwrap();
1096 let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1097 let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1098 let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1099 Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1100 let orch = TeamOrchestrator::new(
1101 registry.clone(),
1102 session,
1103 dispatch_fn,
1104 std::time::Duration::from_secs(60),
1105 );
1106 registry
1107 .create_task(CreateTask {
1108 id: "T004".into(),
1109 title: "task".into(),
1110 assignee_hint: Some("codex-beta".into()),
1111 deps: vec![],
1112 timeout_secs: 60,
1113 spec: None,
1114 success_criteria: None,
1115 })
1116 .unwrap();
1117 registry.try_claim("T004", "codex-beta").unwrap();
1118 registry
1119 .submit_task_result("T004", "codex-beta", "done")
1120 .unwrap();
1121
1122 let envelope = TeamRoutingEnvelope {
1123 run_id: "run-1".into(),
1124 parent_run_id: None,
1125 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1126 fallback_session_keys: vec![],
1127 team_id: "team-test".into(),
1128 delivery_status: RoutingDeliveryStatus::NotRouted,
1129 event: TeamRoutingEvent::checkpoint("T004", "codex-beta", "still working"),
1130 delivery_source: None,
1131 };
1132
1133 assert!(routing_event_is_stale_for_delivery(
1134 orch.as_ref(),
1135 &envelope
1136 ));
1137 }
1138
1139 #[test]
1140 fn submitted_delivery_requires_explicit_resolution_until_status_changes() {
1141 let tmp = tempdir().unwrap();
1142 let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1143 let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1144 let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1145 Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1146 let orch = TeamOrchestrator::new(
1147 registry.clone(),
1148 session,
1149 dispatch_fn,
1150 std::time::Duration::from_secs(60),
1151 );
1152 registry
1153 .create_task(CreateTask {
1154 id: "T006".into(),
1155 title: "task".into(),
1156 assignee_hint: Some("codex-beta".into()),
1157 deps: vec![],
1158 timeout_secs: 60,
1159 spec: None,
1160 success_criteria: None,
1161 })
1162 .unwrap();
1163 registry.try_claim("T006", "codex-beta").unwrap();
1164 registry
1165 .submit_task_result("T006", "codex-beta", "done")
1166 .unwrap();
1167
1168 let envelope = TeamRoutingEnvelope {
1169 run_id: "run-6".into(),
1170 parent_run_id: None,
1171 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1172 fallback_session_keys: vec![],
1173 team_id: "team-test".into(),
1174 delivery_status: RoutingDeliveryStatus::NotRouted,
1175 event: TeamRoutingEvent::submitted("T006", "codex-beta", "done"),
1176 delivery_source: None,
1177 };
1178
1179 assert!(routing_event_still_requires_resolution_after_delivery(
1180 orch.as_ref(),
1181 &envelope,
1182 None
1183 ));
1184
1185 registry.accept_task("T006", "lead").unwrap();
1186
1187 assert!(!routing_event_still_requires_resolution_after_delivery(
1188 orch.as_ref(),
1189 &envelope,
1190 None
1191 ));
1192 }
1193
1194 #[test]
1195 fn blocked_and_missing_completion_delivery_require_explicit_resolution_while_task_is_held() {
1196 let tmp = tempdir().unwrap();
1197 let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1198 let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1199 let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1200 Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1201 let orch = TeamOrchestrator::new(
1202 registry.clone(),
1203 session,
1204 dispatch_fn,
1205 std::time::Duration::from_secs(60),
1206 );
1207 registry
1208 .create_task(CreateTask {
1209 id: "T007".into(),
1210 title: "task".into(),
1211 assignee_hint: Some("codex-beta".into()),
1212 deps: vec![],
1213 timeout_secs: 60,
1214 spec: None,
1215 success_criteria: None,
1216 })
1217 .unwrap();
1218 registry.try_claim("T007", "codex-beta").unwrap();
1219 registry
1220 .hold_claim("T007", "codex-beta", "missing_completion")
1221 .unwrap();
1222
1223 let blocked = TeamRoutingEnvelope {
1224 run_id: "run-7b".into(),
1225 parent_run_id: None,
1226 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1227 fallback_session_keys: vec![],
1228 team_id: "team-test".into(),
1229 delivery_status: RoutingDeliveryStatus::NotRouted,
1230 event: TeamRoutingEvent::blocked("T007", "codex-beta", "blocked"),
1231 delivery_source: None,
1232 };
1233 let missing = TeamRoutingEnvelope {
1234 run_id: "run-7m".into(),
1235 parent_run_id: None,
1236 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1237 fallback_session_keys: vec![],
1238 team_id: "team-test".into(),
1239 delivery_status: RoutingDeliveryStatus::NotRouted,
1240 event: TeamRoutingEvent::missing_completion("T007", "codex-beta"),
1241 delivery_source: None,
1242 };
1243
1244 assert!(routing_event_still_requires_resolution_after_delivery(
1245 orch.as_ref(),
1246 &blocked,
1247 None
1248 ));
1249 assert!(routing_event_still_requires_resolution_after_delivery(
1250 orch.as_ref(),
1251 &missing,
1252 None
1253 ));
1254
1255 registry.reassign_task("T007", "worker").unwrap();
1256
1257 assert!(!routing_event_still_requires_resolution_after_delivery(
1258 orch.as_ref(),
1259 &blocked,
1260 None
1261 ));
1262 assert!(!routing_event_still_requires_resolution_after_delivery(
1263 orch.as_ref(),
1264 &missing,
1265 None
1266 ));
1267 }
1268
1269 #[test]
1270 fn unresolved_review_failure_classification_distinguishes_noop_from_textful_turn() {
1271 let envelope = TeamRoutingEnvelope {
1272 run_id: "run-review".into(),
1273 parent_run_id: None,
1274 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1275 fallback_session_keys: vec![],
1276 team_id: "team-test".into(),
1277 delivery_status: RoutingDeliveryStatus::NotRouted,
1278 event: TeamRoutingEvent::submitted("T008", "codex-beta", "done"),
1279 delivery_source: None,
1280 };
1281
1282 let (classification, reason) = unresolved_review_failure_from_turn_result(None, &envelope);
1283 assert_eq!(classification, ReviewFailureClassification::NoOp);
1284 assert!(reason.contains("produced no output"));
1285
1286 let (classification, reason) = unresolved_review_failure_from_turn_result(
1287 Some("I reviewed it but won't accept yet"),
1288 &envelope,
1289 );
1290 assert_eq!(
1291 classification,
1292 ReviewFailureClassification::StillRequiresResolution
1293 );
1294 assert!(reason.contains("produced output without resolving"));
1295 }
1296
1297 #[test]
1298 fn render_review_retry_delivery_text_includes_previous_failure_context() {
1299 let envelope = TeamRoutingEnvelope {
1300 run_id: "run-review".into(),
1301 parent_run_id: None,
1302 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1303 fallback_session_keys: vec![],
1304 team_id: "team-test".into(),
1305 delivery_status: RoutingDeliveryStatus::PersistedPending,
1306 event: TeamRoutingEvent::submitted("T010", "codex-beta", "done"),
1307 delivery_source: None,
1308 };
1309 let record = PendingRoutingRecord::from_envelope(envelope).note_failed_attempt(
1310 ReviewFailureClassification::StillRequiresResolution,
1311 "lead review turn completed without accept_task/reopen_task",
1312 None,
1313 );
1314
1315 let rendered = render_routing_event_for_delivery(&record);
1316 assert!(rendered.contains("[系统纠偏提醒]"));
1317 assert!(rendered.contains("此前同一控制面事件已失败 1 次"));
1318 assert!(rendered.contains("StillRequiresResolution"));
1319 assert!(rendered.contains("accept_task(task_id=\"T010\")"));
1320 assert!(rendered.contains("reopen_task(task_id=\"T010\", reason=\"...\")"));
1321 }
1322
1323 #[test]
1324 fn failed_and_blocked_reviews_can_resolve_via_post_update_after_user_notification() {
1325 let tmp = tempdir().unwrap();
1326 let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1327 let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1328 let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1329 Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1330 let orch = TeamOrchestrator::new(
1331 registry.clone(),
1332 session,
1333 dispatch_fn,
1334 std::time::Duration::from_secs(60),
1335 );
1336 orch.set_lead_agent_name("claude-alpha".into());
1337
1338 registry
1339 .create_task(CreateTask {
1340 id: "T009".into(),
1341 title: "failed-task".into(),
1342 assignee_hint: Some("codex-beta".into()),
1343 deps: vec![],
1344 timeout_secs: 60,
1345 spec: None,
1346 success_criteria: None,
1347 })
1348 .unwrap();
1349 registry.mark_failed("T009", "quota").unwrap();
1350
1351 let failed = TeamRoutingEnvelope {
1352 run_id: "run-9f".into(),
1353 parent_run_id: None,
1354 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1355 fallback_session_keys: vec![],
1356 team_id: "team-test".into(),
1357 delivery_status: RoutingDeliveryStatus::NotRouted,
1358 event: TeamRoutingEvent::failed("T009", "quota"),
1359 delivery_source: None,
1360 };
1361 let failed_review =
1362 crate::agent_core::team::completion_routing::PendingRoutingRecord::from_envelope(
1363 failed.clone(),
1364 );
1365 assert!(routing_event_still_requires_resolution_after_delivery(
1366 orch.as_ref(),
1367 &failed,
1368 failed_review.review.as_ref(),
1369 ));
1370 orch.begin_lead_review_attempt("run-9f", "T009", failed.event.kind.clone());
1371 assert!(orch.post_message("任务失败,是否重试请用户决定"));
1372 orch.end_lead_review_attempt("run-9f");
1373 assert!(!routing_event_still_requires_resolution_after_delivery(
1374 orch.as_ref(),
1375 &failed,
1376 failed_review.review.as_ref(),
1377 ));
1378
1379 registry
1380 .create_task(CreateTask {
1381 id: "T010".into(),
1382 title: "blocked-task".into(),
1383 assignee_hint: Some("codex-beta".into()),
1384 deps: vec![],
1385 timeout_secs: 60,
1386 spec: None,
1387 success_criteria: None,
1388 })
1389 .unwrap();
1390 registry.try_claim("T010", "codex-beta").unwrap();
1391 registry
1392 .hold_claim("T010", "codex-beta", "waiting_user")
1393 .unwrap();
1394
1395 let blocked = TeamRoutingEnvelope {
1396 run_id: "run-10b".into(),
1397 parent_run_id: None,
1398 requester_session_key: Some(SessionKey::new("lark", "group:test")),
1399 fallback_session_keys: vec![],
1400 team_id: "team-test".into(),
1401 delivery_status: RoutingDeliveryStatus::NotRouted,
1402 event: TeamRoutingEvent::blocked("T010", "codex-beta", "waiting_user"),
1403 delivery_source: None,
1404 };
1405 let blocked_review =
1406 crate::agent_core::team::completion_routing::PendingRoutingRecord::from_envelope(
1407 blocked.clone(),
1408 );
1409 assert!(routing_event_still_requires_resolution_after_delivery(
1410 orch.as_ref(),
1411 &blocked,
1412 blocked_review.review.as_ref(),
1413 ));
1414 orch.begin_lead_review_attempt("run-10b", "T010", blocked.event.kind.clone());
1415 assert!(orch.post_message("任务阻塞,需要用户决定下一步"));
1416 orch.end_lead_review_attempt("run-10b");
1417 assert!(!routing_event_still_requires_resolution_after_delivery(
1418 orch.as_ref(),
1419 &blocked,
1420 blocked_review.review.as_ref(),
1421 ));
1422 }
1423
1424 #[tokio::test]
1425 async fn capture_specialist_reply_text_prefers_direct_result() {
1426 let dir = tempdir().unwrap();
1427 let storage = crate::session::SessionStorage::new(dir.path().to_path_buf());
1428 let session_id = Uuid::new_v4();
1429
1430 let captured = capture_specialist_reply_text(
1431 &storage,
1432 session_id,
1433 &Ok(Some("direct result".to_string())),
1434 )
1435 .await
1436 .unwrap();
1437
1438 assert_eq!(captured.as_deref(), Some("direct result"));
1439 }
1440
1441 #[tokio::test]
1442 async fn capture_specialist_reply_text_falls_back_to_persisted_assistant_message() {
1443 let dir = tempdir().unwrap();
1444 let storage = crate::session::SessionStorage::new(dir.path().to_path_buf());
1445 let session_id = Uuid::new_v4();
1446 let session_dir = dir.path().join(session_id.to_string());
1447 std::fs::create_dir_all(&session_dir).unwrap();
1448 std::fs::write(
1449 session_dir.join("messages.jsonl"),
1450 format!(
1451 "{}\n{}\n",
1452 serde_json::json!({
1453 "id": Uuid::new_v4(),
1454 "role": "user",
1455 "content": "spec",
1456 "timestamp": chrono::Utc::now(),
1457 "sender": "orchestrator",
1458 "tool_calls": null,
1459 "fragment_event_ids": null,
1460 "aggregation_mode": null,
1461 }),
1462 serde_json::json!({
1463 "id": Uuid::new_v4(),
1464 "role": "assistant",
1465 "content": "persisted assistant output",
1466 "timestamp": chrono::Utc::now(),
1467 "sender": "@codex-beta",
1468 "tool_calls": null,
1469 "fragment_event_ids": null,
1470 "aggregation_mode": null,
1471 })
1472 ),
1473 )
1474 .unwrap();
1475
1476 let captured = capture_specialist_reply_text(&storage, session_id, &Ok(None))
1477 .await
1478 .unwrap();
1479
1480 assert_eq!(captured.as_deref(), Some("persisted assistant output"));
1481 }
1482
1483 #[test]
1484 fn missing_completion_excerpt_falls_back_to_runtime_error() {
1485 let excerpt =
1486 missing_completion_excerpt(&Err(anyhow::anyhow!("tool bridge unavailable")), None)
1487 .unwrap();
1488 assert!(excerpt.contains("runtime error: tool bridge unavailable"));
1489 }
1490
1491 #[test]
1492 fn missing_completion_excerpt_zero_output_ok_none() {
1493 let excerpt = missing_completion_excerpt(&Ok(None), None).unwrap();
1494 assert!(
1495 excerpt.contains("zero-output turn"),
1496 "Ok(None) should produce zero-output diagnostic, got: {excerpt}"
1497 );
1498 assert!(excerpt.contains("cold-start"));
1499 }
1500
1501 #[test]
1502 fn missing_completion_excerpt_zero_output_ok_empty_string() {
1503 let excerpt = missing_completion_excerpt(&Ok(Some(String::new())), None).unwrap();
1504 assert!(
1505 excerpt.contains("zero-output turn"),
1506 "Ok(Some(\"\")) should produce zero-output diagnostic, got: {excerpt}"
1507 );
1508 }
1509
1510 #[test]
1511 fn missing_completion_excerpt_nonempty_result_without_captured_returns_none() {
1512 let excerpt = missing_completion_excerpt(&Ok(Some("actual output".to_string())), None);
1515 assert!(
1516 excerpt.is_none(),
1517 "Non-empty Ok(Some) without captured text should yield None, got: {excerpt:?}"
1518 );
1519 }
1520
1521 #[test]
1522 fn persist_missing_completion_reply_artifacts_writes_draft_result() {
1523 let tmp = tempdir().unwrap();
1524 let session = TeamSession::from_dir("team-test", tmp.path().to_path_buf());
1525
1526 persist_missing_completion_reply_artifacts(
1527 &session,
1528 "T005",
1529 "codex-beta",
1530 "raw specialist answer",
1531 )
1532 .unwrap();
1533
1534 let result = std::fs::read_to_string(session.task_dir("T005").join("result.md")).unwrap();
1535 let progress =
1536 std::fs::read_to_string(session.task_dir("T005").join("progress.md")).unwrap();
1537 assert!(result.contains("Draft Specialist Output"));
1538 assert!(result.contains("raw specialist answer"));
1539 assert!(progress.contains("preserved raw reply text"));
1540 }
1541
1542 #[test]
1543 fn public_milestone_visibility_respects_mode() {
1544 assert!(milestone_is_public(
1545 &TeamMilestoneEvent::LeadMessage {
1546 text: "hello".into()
1547 },
1548 TeamPublicUpdatesMode::Minimal
1549 ));
1550 assert!(!milestone_is_public(
1551 &TeamMilestoneEvent::AllTasksDone,
1552 TeamPublicUpdatesMode::Minimal
1553 ));
1554 assert!(milestone_is_public(
1555 &TeamMilestoneEvent::AllTasksDone,
1556 TeamPublicUpdatesMode::Normal
1557 ));
1558 assert!(!milestone_is_public(
1559 &TeamMilestoneEvent::TaskDone {
1560 task_id: "T1".into(),
1561 task_title: "task".into(),
1562 agent: "worker".into(),
1563 done_count: 1,
1564 total: 1,
1565 },
1566 TeamPublicUpdatesMode::Normal
1567 ));
1568 assert!(milestone_is_public(
1569 &TeamMilestoneEvent::TaskFailed {
1570 task_id: "T1".into(),
1571 agent: "worker".into(),
1572 reason: "boom".into(),
1573 },
1574 TeamPublicUpdatesMode::Normal
1575 ));
1576 }
1577
1578 #[tokio::test]
1579 async fn milestone_send_retries_without_reply_to() {
1580 let channel = Arc::new(MockChannel {
1581 sent: Mutex::new(Vec::new()),
1582 fail_when_reply_to: true,
1583 });
1584 let outbound = OutboundMsg {
1585 session_key: SessionKey::new("lark", "group:test"),
1586 content: MsgContent::text("hello"),
1587 reply_to: Some("reply-id".into()),
1588 thread_ts: None,
1589 };
1590 let (sent, result) = send_with_reply_fallback(
1591 &(channel.clone() as Arc<dyn crate::channels_internal::Channel>),
1592 outbound,
1593 )
1594 .await;
1595 result.unwrap();
1596 assert!(sent.reply_to.is_none());
1597 assert_eq!(channel.sent.lock().unwrap().len(), 1);
1598 }
1599
1600 #[test]
1601 fn dm_team_identity_uses_default_lark_instance() {
1602 let cfg = GatewayConfig {
1603 channels: ChannelsSection {
1604 lark: Some(LarkSection {
1605 enabled: true,
1606 presentation: ProgressPresentationMode::FinalOnly,
1607 trigger_policy: None,
1608 default_instance: Some("default".into()),
1609 instances: vec![],
1610 }),
1611 ..Default::default()
1612 },
1613 ..Default::default()
1614 };
1615
1616 let lead_instance =
1617 default_channel_instance_for_scope(&cfg, "lark", "user:ou_test").expect("instance");
1618 let registered = SessionKey::with_instance("lark", &lead_instance, "user:ou_test");
1619 let inbound = SessionKey::with_instance("lark", "default", "user:ou_test");
1620
1621 assert_eq!(
1622 stable_team_id_for_session_key(®istered),
1623 stable_team_id_for_session_key(&inbound)
1624 );
1625 }
1626
1627 #[test]
1628 fn group_team_identity_does_not_force_default_lark_instance() {
1629 let cfg = GatewayConfig {
1630 channels: ChannelsSection {
1631 lark: Some(LarkSection {
1632 enabled: true,
1633 presentation: ProgressPresentationMode::FinalOnly,
1634 trigger_policy: None,
1635 default_instance: Some("default".into()),
1636 instances: vec![],
1637 }),
1638 ..Default::default()
1639 },
1640 ..Default::default()
1641 };
1642
1643 assert_eq!(
1644 default_channel_instance_for_scope(&cfg, "lark", "group:oc_test"),
1645 None
1646 );
1647 }
1648}