Skip to main content

clawbro/
team_runtime.rs

1use crate::agent_core::team::completion_routing::{
2    PendingRoutingRecord, ReviewAttemptDiagnostic, ReviewFailureClassification,
3    RoutingDeliveryStatus, TeamRoutingEnvelope,
4};
5use crate::agent_core::team::milestone::TeamMilestoneEvent;
6use crate::agent_core::team::milestone_delivery::{milestone_dedupe_key, milestone_is_public};
7use crate::agent_core::team::registry::TaskStatus;
8use crate::agent_core::team::session::{ChannelSendSourceKind, ChannelSendStatus, TeamSession};
9use crate::agent_core::{SessionRegistry, TurnExecutionContext};
10use crate::channel_registry::ChannelRegistry;
11use crate::config::{GatewayConfig, InteractionMode};
12use crate::delivery_resolver::resolve_delivery;
13use crate::protocol::{DashboardEvent, InboundMsg, MsgContent, MsgSource, OutboundMsg, SessionKey};
14use anyhow::Result;
15use std::sync::{Arc, OnceLock};
16use std::time::Duration;
17use tokio::sync::mpsc;
18
19fn default_channel_instance_for_scope(
20    cfg: &GatewayConfig,
21    channel_name: &str,
22    scope: &str,
23) -> Option<String> {
24    if !scope.starts_with("user:") {
25        return None;
26    }
27    match channel_name {
28        "lark" => cfg
29            .channels
30            .lark
31            .as_ref()
32            .map(|lark| lark.default_instance_id().to_string()),
33        _ => None,
34    }
35}
36
37async fn send_with_reply_fallback(
38    channel: &Arc<dyn crate::channels_internal::Channel>,
39    outbound: OutboundMsg,
40) -> (OutboundMsg, anyhow::Result<()>) {
41    match channel.send(&outbound).await {
42        Ok(()) => (outbound, Ok(())),
43        Err(error) if outbound.reply_to.is_some() => {
44            tracing::warn!(
45                channel = %channel.name(),
46                reply_to = outbound.reply_to.as_deref().unwrap_or(""),
47                error = %error,
48                "milestone reply_to send failed; retrying as direct scope send"
49            );
50            let mut fallback = outbound.clone();
51            fallback.reply_to = None;
52            match channel.send(&fallback).await {
53                Ok(()) => (fallback, Ok(())),
54                Err(fallback_error) => (fallback, Err(fallback_error)),
55            }
56        }
57        Err(error) => (outbound, Err(error)),
58    }
59}
60
61pub async fn wire_team_runtime(
62    registry: Arc<SessionRegistry>,
63    cfg: &GatewayConfig,
64    channel_map: Arc<ChannelRegistry>,
65    heartbeat_interval: Duration,
66) -> Result<()> {
67    use crate::agent_core::team::{
68        completion_routing::{RoutingDeliveryStatus, TeamNotifyRequest},
69        heartbeat::DispatchFn,
70        orchestrator::TeamOrchestrator,
71        registry::TaskRegistry,
72        session::{stable_team_id_for_session_key, TeamSession},
73    };
74
75    let (team_notify_tx, mut team_notify_rx) = mpsc::channel::<TeamNotifyRequest>(256);
76    let team_notify_tx_for_orch = team_notify_tx.clone();
77    let team_scopes = cfg.normalized_team_scopes();
78    let mut review_retry_orchestrators = Vec::new();
79    let cfg_for_delivery = Arc::new(cfg.clone());
80    tracing::info!(
81        count = team_scopes.len(),
82        "wire_team_runtime: team scopes found"
83    );
84
85    for team_scope in &team_scopes {
86        tracing::info!(scope = %team_scope.scope, name = ?team_scope.name, "wire_team_runtime: wiring team scope");
87        let Some(channel_name) = team_scope.mode.channel.clone() else {
88            tracing::error!(
89                scope = %team_scope.scope,
90                "team scope is missing mode.channel; skipping team runtime wiring for this scope"
91            );
92            continue;
93        };
94        let lead_channel_instance =
95            default_channel_instance_for_scope(cfg, &channel_name, &team_scope.scope);
96        let lead_key = crate::protocol::SessionKey {
97            channel: channel_name.clone(),
98            channel_instance: lead_channel_instance.clone(),
99            scope: team_scope.scope.clone(),
100        };
101        let team_id = stable_team_id_for_session_key(&lead_key);
102        let session = match TeamSession::new(&team_scope.scope, &team_id) {
103            Ok(s) => Arc::new(s),
104            Err(e) => {
105                tracing::error!(scope = %team_scope.scope, "Failed to create TeamSession: {e:#}");
106                continue;
107            }
108        };
109        let db_path = session.dir.join("tasks.db");
110        let task_registry = match TaskRegistry::new(db_path.to_str().unwrap_or(":memory:")) {
111            Ok(r) => Arc::new(r),
112            Err(e) => {
113                tracing::error!(scope = %team_scope.scope, "Failed to open TaskRegistry: {e:#}");
114                continue;
115            }
116        };
117        let registry_for_dispatch = Arc::clone(&registry);
118        let registry_for_milestone = Arc::clone(&registry);
119        let task_reg_for_dispatch = Arc::clone(&task_registry);
120        let team_session_for_dispatch = Arc::clone(&session);
121        let dispatch_requester_key = lead_key.clone();
122        let team_orch_for_dispatch: Arc<OnceLock<Arc<TeamOrchestrator>>> =
123            Arc::new(OnceLock::new());
124        let team_orch_for_dispatch_in_closure = Arc::clone(&team_orch_for_dispatch);
125        let team_orch_for_milestone: Arc<OnceLock<Arc<TeamOrchestrator>>> =
126            Arc::new(OnceLock::new());
127        let dispatch_fn: DispatchFn = Arc::new(move |agent: String, task| {
128            let registry = Arc::clone(&registry_for_dispatch);
129            let task_reg = Arc::clone(&task_reg_for_dispatch);
130            let team_session = Arc::clone(&team_session_for_dispatch);
131            let requester_key = dispatch_requester_key.clone();
132            let team_orch_cell = Arc::clone(&team_orch_for_dispatch_in_closure);
133            Box::pin(async move {
134                let specialist_key = team_session.specialist_session_key(&agent);
135                let specialist_channel = specialist_key.channel.clone();
136                let reminder = team_session.build_task_reminder(&task, &task_reg);
137                registry.set_task_reminder(specialist_key.clone(), reminder);
138                let dispatch_started_at = chrono::Utc::now();
139                if let Some(team_orch) = team_orch_cell.get() {
140                    team_orch.record_dispatch_start(
141                        &task.id,
142                        &agent,
143                        requester_key.clone(),
144                        None,
145                        team_orch.lead_delivery_source(),
146                    );
147                }
148                let msg = crate::protocol::InboundMsg {
149                    id: uuid::Uuid::new_v4().to_string(),
150                    session_key: specialist_key,
151                    content: crate::protocol::MsgContent::text(
152                        team_session.build_task_dispatch_message(&task),
153                    ),
154                    sender: "orchestrator".to_string(),
155                    channel: specialist_channel,
156                    timestamp: chrono::Utc::now(),
157                    thread_ts: None,
158                    target_agent: Some(format!("@{}", agent)),
159                    source: crate::protocol::MsgSource::Heartbeat,
160                };
161                let result = registry
162                    .handle_with_context(msg, TurnExecutionContext::default())
163                    .await;
164                let specialist_session_id = registry
165                    .session_manager_ref()
166                    .get_or_create(&team_session.specialist_session_key(&agent))
167                    .await?;
168                let captured_reply_text = capture_specialist_reply_text(
169                    registry.session_manager_ref().storage().as_ref(),
170                    specialist_session_id,
171                    &result,
172                )
173                .await
174                .unwrap_or_else(|error| {
175                    tracing::warn!(
176                        error = %error,
177                        task_id = %task.id,
178                        agent = %agent,
179                        "failed to capture specialist reply text for team diagnostics"
180                    );
181                    None
182                });
183                let reply_excerpt =
184                    missing_completion_excerpt(&result, captured_reply_text.as_deref());
185                if let Some(ref reply_text) = captured_reply_text {
186                    let _ = team_session.append_specialist_reply(&agent, &task.id, reply_text);
187                }
188                if let Some(team_orch) = team_orch_cell.get() {
189                    let outcome =
190                        team_orch.classify_specialist_turn(&task.id, &agent, dispatch_started_at);
191                    if matches!(
192                        outcome,
193                        crate::agent_core::team::specialist_turn::SpecialistTurnOutcome::MissingCompletion
194                    ) {
195                        if let Some(ref reply_text) = captured_reply_text {
196                            let _ = persist_missing_completion_reply_artifacts(
197                                team_session.as_ref(),
198                                &task.id,
199                                &agent,
200                                reply_text,
201                            );
202                        }
203                        team_orch.handle_specialist_missing_completion(
204                            &task.id,
205                            &agent,
206                            reply_excerpt.as_deref(),
207                        )?;
208                        registry
209                            .session_manager_ref()
210                            .reset_conversation(specialist_session_id)
211                            .await?;
212                    }
213                }
214                if result.is_ok() {
215                    if let Some(team_orch) = team_orch_cell.get() {
216                        team_orch.notify_task_dispatched(&task.id, &task.title, &agent);
217                    }
218                }
219                result.map(|_| ())
220            })
221        });
222
223        let team_orch = TeamOrchestrator::new(
224            task_registry,
225            Arc::clone(&session),
226            dispatch_fn,
227            heartbeat_interval,
228        );
229        let _ = team_orch_for_dispatch.set(Arc::clone(&team_orch));
230        let _ = team_orch_for_milestone.set(Arc::clone(&team_orch));
231
232        let channels_for_notify = Arc::clone(&channel_map);
233        let session_for_notify = Arc::clone(&session);
234        let public_updates_mode = team_scope.team.public_updates;
235        let lead_agent_name_for_notify = team_scope.mode.front_bot.clone();
236        let cfg_for_milestone = Arc::clone(&cfg_for_delivery);
237        let team_orch_for_milestone_in_closure = Arc::clone(&team_orch_for_milestone);
238        team_orch.set_milestone_fn(Arc::new(
239            move |scope: crate::protocol::SessionKey, event| {
240                use crate::agent_core::team::milestone::render_for_im;
241                if !milestone_is_public(&event, public_updates_mode) {
242                    tracing::debug!(
243                        scope = %scope.scope,
244                        kind = %event.kind_str(),
245                        "Suppressing internal-only team milestone from direct channel delivery"
246                    );
247                    return;
248                }
249                if let Some(dedupe_key) = milestone_dedupe_key(&event) {
250                    match session_for_notify.mark_delivery_dedupe(&scope.scope, &dedupe_key) {
251                        Ok(true) => {}
252                        Ok(false) => {
253                            let _ = session_for_notify
254                                .record_delivery_dedupe_hit(&scope.scope, &dedupe_key);
255                            tracing::debug!(
256                                scope = %scope.scope,
257                                kind = %event.kind_str(),
258                                "Suppressing duplicate team milestone channel delivery"
259                            );
260                            return;
261                        }
262                        Err(err) => {
263                            tracing::warn!(
264                                scope = %scope.scope,
265                                kind = %event.kind_str(),
266                                error = %err,
267                                "Failed to persist milestone delivery dedupe key"
268                            );
269                        }
270                    }
271                }
272                let msg = render_for_im(&event);
273                let channels = Arc::clone(&channels_for_notify);
274                let session_for_record = Arc::clone(&session_for_notify);
275                let lead_agent_name = lead_agent_name_for_notify.clone();
276                let dedupe_key_for_record = milestone_dedupe_key(&event);
277                let cfg = Arc::clone(&cfg_for_milestone);
278                let team_orch_cell = Arc::clone(&team_orch_for_milestone_in_closure);
279                let registry_for_send = Arc::clone(&registry_for_milestone);
280                tokio::spawn(async move {
281                    let stored_source = team_orch_cell
282                        .get()
283                        .and_then(|team_orch| team_orch.lead_delivery_source());
284                    let (source_kind, source_agent) =
285                        milestone_channel_origin(&event, lead_agent_name.as_deref());
286                    let resolved = resolve_delivery(
287                        cfg.as_ref(),
288                        channels.as_ref(),
289                        milestone_delivery_purpose(&event),
290                        &scope,
291                        None,
292                        stored_source.as_ref(),
293                        Some(&source_agent),
294                        None,
295                        None,
296                    );
297                    let (outbound, send_result, sender_channel_instance) = if let Some(resolved) =
298                        resolved
299                    {
300                        let sender_channel_instance = resolved.sender_channel_instance.clone();
301                        let outbound = resolved.outbound_text(&msg);
302                        let (outbound, send_result) =
303                            send_with_reply_fallback(&resolved.sender, outbound).await;
304                        (outbound, send_result, sender_channel_instance)
305                    } else if let Some(ch) = channels.resolve_for_session(&scope) {
306                        let outbound = crate::protocol::OutboundMsg {
307                            session_key: scope.clone(),
308                            content: crate::protocol::MsgContent::text(msg),
309                            reply_to: stored_source
310                                .as_ref()
311                                .and_then(|source| source.reply_to.clone()),
312                            thread_ts: stored_source
313                                .as_ref()
314                                .and_then(|source| source.thread_ts.clone()),
315                        };
316                        let (outbound, send_result) = send_with_reply_fallback(&ch, outbound).await;
317                        (outbound, send_result, None)
318                    } else {
319                        return;
320                    };
321                    if let Err(e) = &send_result {
322                        tracing::error!("Milestone notify send error: {e}");
323                    }
324                    let (status, error) = match send_result {
325                        Ok(()) => (ChannelSendStatus::Sent, None),
326                        Err(err) => (ChannelSendStatus::SendFailed, Some(err.to_string())),
327                    };
328                    match session_for_record.record_channel_send(
329                        &outbound.session_key.channel,
330                        sender_channel_instance.as_deref(),
331                        outbound.session_key.channel_instance.as_deref(),
332                        &outbound.session_key.scope,
333                        None,
334                        stored_source.as_ref(),
335                        outbound.reply_to.as_deref(),
336                        outbound.thread_ts.as_deref(),
337                        source_kind,
338                        &source_agent,
339                        milestone_task_id(&event),
340                        dedupe_key_for_record.as_deref(),
341                        outbound.content.as_text().unwrap_or_default(),
342                        status,
343                        error.as_deref(),
344                    ) {
345                        Ok(record) => registry_for_send.emit_dashboard_event(
346                            DashboardEvent::TeamChannelSend {
347                                team_id: session_for_record.team_id.clone(),
348                                record,
349                            },
350                        ),
351                        Err(err) => {
352                            tracing::warn!(
353                                team_id = %session_for_record.team_id,
354                                error = %err,
355                                "Failed to append milestone channel send ledger entry"
356                            );
357                        }
358                    }
359                });
360            },
361        ));
362
363        team_orch.set_lead_session_key(lead_key.clone());
364        team_orch.set_scope(lead_key);
365        if let Some(front_bot) = &team_scope.mode.front_bot {
366            team_orch.set_lead_agent_name(front_bot.clone());
367            tracing::info!(front_bot = %front_bot, scope = %team_scope.scope, "Lead agent wired from front_bot");
368        }
369        if !team_scope.team.roster.is_empty() {
370            team_orch.set_available_specialists(team_scope.team.roster.clone());
371            tracing::info!(specialists = ?team_scope.team.roster, scope = %team_scope.scope, "Available specialists wired");
372        }
373        team_orch.set_max_parallel(team_scope.team.max_parallel);
374        tracing::info!(
375            scope = %team_scope.scope,
376            max_parallel = team_scope.team.max_parallel,
377            "team dispatch limit wired"
378        );
379
380        team_orch.set_team_notify_tx(team_notify_tx_for_orch.clone());
381
382        team_orch.bootstrap_workspace_artifacts().map_err(|e| {
383            anyhow::anyhow!(
384                "failed to bootstrap team workspace for scope '{}' (team '{}'): {e:#}",
385                team_scope.scope,
386                team_id
387            )
388        })?;
389        registry.register_team_orchestrator(team_id.clone(), team_orch);
390        review_retry_orchestrators.push(
391            registry
392                .get_team_orchestrator(&team_id)
393                .expect("team orchestrator should be immediately retrievable after registration"),
394        );
395        tracing::info!(scope = %team_scope.scope, team_id = %team_id, "TeamOrchestrator registered");
396    }
397
398    if !review_retry_orchestrators.is_empty() {
399        tokio::spawn(async move {
400            let mut interval = tokio::time::interval(Duration::from_secs(5));
401            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
402            loop {
403                interval.tick().await;
404                for team_orch in &review_retry_orchestrators {
405                    team_orch.retry_due_pending_routing_events();
406                }
407            }
408        });
409        tracing::info!("Team review retry task started");
410    }
411
412    for group in cfg.groups.iter().filter(|g| g.mode.auto_promote) {
413        registry.add_auto_promote_scope(group.scope.clone());
414        tracing::info!(scope = %group.scope, "auto_promote keyword detection enabled");
415    }
416
417    for group in cfg
418        .groups
419        .iter()
420        .filter(|group| !matches!(group.mode.interaction, InteractionMode::Team))
421    {
422        if let Some(front_bot) = &group.mode.front_bot {
423            registry.register_scope_binding_with_channel(
424                group.mode.channel.clone(),
425                group.scope.clone(),
426                front_bot.clone(),
427            );
428            tracing::info!(scope = %group.scope, front_bot = %front_bot, "scope binding registered");
429        }
430    }
431
432    for team_scope in &team_scopes {
433        if let Some(front_bot) = &team_scope.mode.front_bot {
434            registry.register_scope_binding_with_channel(
435                team_scope.mode.channel.clone(),
436                team_scope.scope.clone(),
437                front_bot.clone(),
438            );
439            tracing::info!(scope = %team_scope.scope, front_bot = %front_bot, "team scope binding registered");
440        }
441    }
442
443    for binding in &cfg.bindings {
444        registry.register_binding(binding.to_binding_rule());
445        tracing::info!(agent = %binding.agent_name(), kind = ?binding, "routing binding registered");
446    }
447
448    {
449        let registry_for_notify = Arc::clone(&registry);
450        tokio::spawn(async move {
451            while let Some(request) = team_notify_rx.recv().await {
452                let base_record = request.clone().into_pending_record();
453                if let Some(team_orch) =
454                    registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
455                {
456                    if routing_event_is_stale_for_delivery(team_orch.as_ref(), &request.envelope) {
457                        tracing::info!(
458                            team_id = %request.envelope.team_id,
459                            task_id = %request.envelope.event.task_id,
460                            kind = ?request.envelope.event.kind,
461                            "Suppressing stale team routing event before lead delivery"
462                        );
463                        team_orch.mark_routing_event_delivered(
464                            &request
465                                .envelope
466                                .clone()
467                                .with_delivery_status(RoutingDeliveryStatus::DirectDelivered),
468                        );
469                        continue;
470                    }
471                }
472                let text = render_routing_event_for_delivery(&base_record);
473                let mut delivered = None;
474                let mut pending_record: Option<PendingRoutingRecord> = None;
475
476                for (attempt_index, target) in routing_attempt_targets(&request.envelope)
477                    .into_iter()
478                    .enumerate()
479                {
480                    let busy = registry_for_notify.is_session_busy(&target);
481                    let turn_ctx = team_notify_turn_context(&request.envelope, &target);
482                    let inbound = team_notify_inbound(
483                        &target,
484                        &text,
485                        turn_ctx
486                            .delivery_source
487                            .as_ref()
488                            .and_then(|source| source.thread_ts.clone()),
489                    );
490                    if let Some(team_orch) =
491                        registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
492                    {
493                        if base_record.review.is_some() {
494                            team_orch.begin_lead_review_attempt(
495                                &request.envelope.run_id,
496                                &request.envelope.event.task_id,
497                                request.envelope.event.kind.clone(),
498                            );
499                        }
500                    }
501                    let delivery_result = registry_for_notify
502                        .handle_with_context(inbound, turn_ctx)
503                        .await;
504                    if let Some(team_orch) =
505                        registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
506                    {
507                        if base_record.review.is_some() {
508                            team_orch.end_lead_review_attempt(&request.envelope.run_id);
509                        }
510                    }
511                    match delivery_result {
512                        Ok(result_text) => {
513                            if let Some(team_orch) =
514                                registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
515                            {
516                                if routing_event_still_requires_resolution_after_delivery(
517                                    team_orch.as_ref(),
518                                    &request.envelope,
519                                    base_record.review.as_ref(),
520                                ) {
521                                    let (classification, reason) =
522                                        unresolved_review_failure_from_turn_result(
523                                            result_text.as_deref(),
524                                            &request.envelope,
525                                        );
526                                    let record = base_record
527                                        .clone()
528                                        .with_delivery_status(
529                                            RoutingDeliveryStatus::PersistedPending,
530                                        )
531                                        .note_failed_attempt(
532                                            classification,
533                                            reason,
534                                            Some(next_review_retry_at(
535                                                base_record
536                                                    .review
537                                                    .as_ref()
538                                                    .map(|review| review.attempt_count + 1)
539                                                    .unwrap_or(1),
540                                            )),
541                                        );
542                                    if let Some(diagnostic) = review_attempt_diagnostic(&record) {
543                                        let _ = team_orch
544                                            .session
545                                            .append_review_attempt_diagnostic(&diagnostic);
546                                    }
547                                    pending_record = Some(record);
548                                    tracing::warn!(
549                                        team_id = %request.envelope.team_id,
550                                        task_id = %request.envelope.event.task_id,
551                                        kind = ?request.envelope.event.kind,
552                                        target = %target.scope,
553                                        "TeamNotify turn completed without resolving required team action; treating delivery as incomplete"
554                                    );
555                                    continue;
556                                }
557                            }
558                            delivered = Some(request.envelope.clone().with_delivery_status(
559                                delivery_status_for_attempt(attempt_index, busy),
560                            ));
561                            break;
562                        }
563                        Err(e) => {
564                            let requester_scope = request
565                                .envelope
566                                .requester_session_key
567                                .as_ref()
568                                .map(|key| key.scope.as_str())
569                                .unwrap_or("<none>");
570                            tracing::warn!(
571                                team_id = %request.envelope.team_id,
572                                requester = %requester_scope,
573                                target = %target.scope,
574                                attempt_index,
575                                "TeamNotify delivery attempt failed: {e}"
576                            );
577                            if let Some(team_orch) =
578                                registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
579                            {
580                                let record = base_record
581                                    .clone()
582                                    .with_delivery_status(RoutingDeliveryStatus::PersistedPending)
583                                    .note_failed_attempt(
584                                        ReviewFailureClassification::RuntimeError,
585                                        e.to_string(),
586                                        Some(next_review_retry_at(
587                                            base_record
588                                                .review
589                                                .as_ref()
590                                                .map(|review| review.attempt_count + 1)
591                                                .unwrap_or(1),
592                                        )),
593                                    );
594                                if let Some(diagnostic) = review_attempt_diagnostic(&record) {
595                                    let _ = team_orch
596                                        .session
597                                        .append_review_attempt_diagnostic(&diagnostic);
598                                }
599                                pending_record = Some(record);
600                            }
601                        }
602                    }
603                }
604
605                if let Some(team_orch) =
606                    registry_for_notify.get_team_orchestrator(&request.envelope.team_id)
607                {
608                    if let Some(delivered) = delivered {
609                        team_orch.mark_routing_event_delivered(&delivered);
610                    } else {
611                        let pending = pending_record.unwrap_or_else(|| {
612                            base_record
613                                .clone()
614                                .with_delivery_status(RoutingDeliveryStatus::PersistedPending)
615                        });
616                        team_orch.persist_pending_routing_record(pending);
617                    }
618                }
619            }
620        });
621        tracing::info!("TeamNotify redispatch task started");
622    }
623
624    Ok(())
625}
626
627fn milestone_channel_origin(
628    event: &TeamMilestoneEvent,
629    lead_agent_name: Option<&str>,
630) -> (ChannelSendSourceKind, String) {
631    match event {
632        TeamMilestoneEvent::LeadMessage { .. } => (
633            ChannelSendSourceKind::LeadText,
634            lead_agent_name.unwrap_or("leader").to_string(),
635        ),
636        TeamMilestoneEvent::TaskDispatched { agent, .. }
637        | TeamMilestoneEvent::TaskCheckpoint { agent, .. }
638        | TeamMilestoneEvent::TaskSubmitted { agent, .. }
639        | TeamMilestoneEvent::TaskBlocked { agent, .. }
640        | TeamMilestoneEvent::TaskDone { agent, .. } => {
641            (ChannelSendSourceKind::Milestone, agent.clone())
642        }
643        TeamMilestoneEvent::TaskFailed { agent, .. } => {
644            (ChannelSendSourceKind::Milestone, agent.clone())
645        }
646        TeamMilestoneEvent::TasksUnlocked { .. } | TeamMilestoneEvent::AllTasksDone => {
647            (ChannelSendSourceKind::Milestone, "team-runtime".to_string())
648        }
649    }
650}
651
652fn milestone_delivery_purpose(event: &TeamMilestoneEvent) -> crate::config::DeliveryPurposeConfig {
653    match event {
654        TeamMilestoneEvent::LeadMessage { .. } => crate::config::DeliveryPurposeConfig::LeadMessage,
655        _ => crate::config::DeliveryPurposeConfig::Milestone,
656    }
657}
658
659fn milestone_task_id(event: &TeamMilestoneEvent) -> Option<&str> {
660    match event {
661        TeamMilestoneEvent::TaskDispatched { task_id, .. }
662        | TeamMilestoneEvent::TaskCheckpoint { task_id, .. }
663        | TeamMilestoneEvent::TaskSubmitted { task_id, .. }
664        | TeamMilestoneEvent::TaskBlocked { task_id, .. }
665        | TeamMilestoneEvent::TaskFailed { task_id, .. }
666        | TeamMilestoneEvent::TaskDone { task_id, .. } => Some(task_id.as_str()),
667        TeamMilestoneEvent::TasksUnlocked { .. }
668        | TeamMilestoneEvent::AllTasksDone
669        | TeamMilestoneEvent::LeadMessage { .. } => None,
670    }
671}
672
673fn truncate_for_missing_completion(text: &str, max_chars: usize) -> String {
674    let mut truncated = text.chars().take(max_chars).collect::<String>();
675    if text.chars().count() > max_chars {
676        truncated.push_str("...");
677    }
678    truncated
679}
680
681async fn capture_specialist_reply_text(
682    storage: &crate::session::SessionStorage,
683    specialist_session_id: uuid::Uuid,
684    result: &anyhow::Result<Option<String>>,
685) -> Result<Option<String>> {
686    if let Ok(Some(reply_text)) = result {
687        if !reply_text.trim().is_empty() {
688            return Ok(Some(reply_text.clone()));
689        }
690    }
691
692    let recent = storage
693        .load_recent_messages(specialist_session_id, 20)
694        .await
695        .unwrap_or_default();
696    Ok(recent
697        .iter()
698        .rev()
699        .find(|msg| msg.role == "assistant" && !msg.content.trim().is_empty())
700        .map(|msg| msg.content.clone()))
701}
702
703fn missing_completion_excerpt(
704    result: &anyhow::Result<Option<String>>,
705    captured_reply_text: Option<&str>,
706) -> Option<String> {
707    if let Some(reply_text) = captured_reply_text {
708        return Some(truncate_for_missing_completion(reply_text, 240));
709    }
710    match result {
711        Err(error) => Some(truncate_for_missing_completion(
712            &format!("runtime error: {error}"),
713            240,
714        )),
715        // Backend returned zero or empty text — no tool calls, no content.
716        // Most likely cause: ACP subprocess cold-start failure or MCP bridge unavailable.
717        Ok(None) => Some(
718            "zero-output turn: backend returned no text (possible cold-start, subprocess \
719             initialization failure, or MCP bridge unavailable)"
720                .to_string(),
721        ),
722        Ok(Some(text)) if text.trim().is_empty() => Some(
723            "zero-output turn: backend returned empty text (possible cold-start, subprocess \
724             initialization failure, or MCP bridge unavailable)"
725                .to_string(),
726        ),
727        Ok(Some(_)) => None,
728    }
729}
730
731fn persist_missing_completion_reply_artifacts(
732    team_session: &TeamSession,
733    task_id: &str,
734    agent: &str,
735    reply_text: &str,
736) -> Result<()> {
737    let result_path = team_session.task_dir(task_id).join("result.md");
738    let existing = std::fs::read_to_string(&result_path).unwrap_or_default();
739    if existing.trim().is_empty() {
740        team_session.write_task_result(
741            task_id,
742            &format!(
743                "# Draft Specialist Output\n\nThis task ended without a canonical team completion tool call.\nThe raw assistant reply from `{agent}` was preserved below for review/retry.\n\n---\n\n{reply_text}\n"
744            ),
745        )?;
746    }
747    team_session.append_task_progress(
748        task_id,
749        &format!(
750            "[{}] preserved raw reply text from {} before specialist session reset.",
751            chrono::Utc::now().to_rfc3339(),
752            agent
753        ),
754    )?;
755    Ok(())
756}
757
758fn routing_attempt_targets(envelope: &TeamRoutingEnvelope) -> Vec<SessionKey> {
759    let mut targets = Vec::with_capacity(1 + envelope.fallback_session_keys.len());
760    if let Some(requester) = &envelope.requester_session_key {
761        targets.push(requester.clone());
762    }
763    for key in &envelope.fallback_session_keys {
764        if !targets.contains(key) {
765            targets.push(key.clone());
766        }
767    }
768    targets
769}
770
771fn delivery_status_for_attempt(attempt_index: usize, busy: bool) -> RoutingDeliveryStatus {
772    if attempt_index > 0 {
773        RoutingDeliveryStatus::FallbackRedirected
774    } else if busy {
775        RoutingDeliveryStatus::QueuedDelivered
776    } else {
777        RoutingDeliveryStatus::DirectDelivered
778    }
779}
780
781fn team_notify_inbound(target: &SessionKey, text: &str, thread_ts: Option<String>) -> InboundMsg {
782    InboundMsg {
783        id: uuid::Uuid::new_v4().to_string(),
784        session_key: target.clone(),
785        content: MsgContent::text(text),
786        sender: "gateway".to_string(),
787        channel: target.channel.clone(),
788        timestamp: chrono::Utc::now(),
789        thread_ts,
790        target_agent: None,
791        source: MsgSource::TeamNotify,
792    }
793}
794
795fn team_notify_turn_context(
796    envelope: &TeamRoutingEnvelope,
797    target: &SessionKey,
798) -> TurnExecutionContext {
799    let delivery_source = envelope
800        .delivery_source
801        .as_ref()
802        .filter(|source| source.session_key() == *target)
803        .cloned();
804    TurnExecutionContext { delivery_source }
805}
806
807fn routing_event_is_stale_for_delivery(
808    team_orch: &crate::agent_core::team::orchestrator::TeamOrchestrator,
809    envelope: &TeamRoutingEnvelope,
810) -> bool {
811    let task = match team_orch.registry.get_task(&envelope.event.task_id) {
812        Ok(Some(task)) => task,
813        Ok(None) | Err(_) => return false,
814    };
815
816    match envelope.event.kind {
817        crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskCheckpoint => {
818            matches!(
819                task.status_parsed(),
820                TaskStatus::Submitted { .. }
821                    | TaskStatus::Accepted { .. }
822                    | TaskStatus::Done
823                    | TaskStatus::Failed(_)
824            )
825        }
826        crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskSubmitted => {
827            matches!(
828                task.status_parsed(),
829                TaskStatus::Accepted { .. } | TaskStatus::Done | TaskStatus::Failed(_)
830            )
831        }
832        _ => false,
833    }
834}
835
836fn render_routing_event_for_delivery(record: &PendingRoutingRecord) -> String {
837    let mut rendered = record.envelope.event.render_for_parent();
838    let Some(review) = record.review.as_ref() else {
839        return rendered;
840    };
841    if review.attempt_count == 0 {
842        return rendered;
843    }
844
845    let failure_label = review
846        .last_failure_classification
847        .map(review_failure_label)
848        .unwrap_or("未分类失败");
849    let failure_reason = review
850        .last_failure_reason
851        .as_deref()
852        .unwrap_or("上一轮未留下明确失败原因");
853    let corrective_contract =
854        review_retry_corrective_contract(review.review_kind, &record.envelope.event.task_id);
855
856    rendered = format!(
857        "[系统纠偏提醒]\n此前同一控制面事件已失败 {attempts} 次。\n最近一次失败分类:{failure_label}\n最近一次失败原因:{failure_reason}\n\n{corrective_contract}\n\n{rendered}",
858        attempts = review.attempt_count,
859    );
860    rendered
861}
862
863fn review_failure_label(classification: ReviewFailureClassification) -> &'static str {
864    match classification {
865        ReviewFailureClassification::NoOp => "NoOp",
866        ReviewFailureClassification::RuntimeError => "RuntimeError",
867        ReviewFailureClassification::DeliveryFailure => "DeliveryFailure",
868        ReviewFailureClassification::StillRequiresResolution => "StillRequiresResolution",
869    }
870}
871
872fn review_retry_corrective_contract(
873    review_kind: crate::agent_core::team::completion_routing::ReviewRequiredKind,
874    task_id: &str,
875) -> String {
876    match review_kind {
877        crate::agent_core::team::completion_routing::ReviewRequiredKind::Submitted => format!(
878            "这是 submitted 验收纠偏回合,不是新的用户请求。不要再输出解释性文字作为结束。先检查结果工件,然后在本轮结束前恰好调用一个工具:accept_task(task_id=\"{task_id}\") 或 reopen_task(task_id=\"{task_id}\", reason=\"...\")。"
879        ),
880        crate::agent_core::team::completion_routing::ReviewRequiredKind::Blocked => format!(
881            "这是 blocked 处理纠偏回合,不是新的用户请求。你必须明确处理 {task_id}:优先用内部动作继续推进;只有在确实需要用户决策时,才调用 post_update(...) 向用户说明阻塞并请求决策。"
882        ),
883        crate::agent_core::team::completion_routing::ReviewRequiredKind::Failed => format!(
884            "这是 failed 处理纠偏回合,不是新的用户请求。你必须明确处理 {task_id}:若要交还用户决策,调用 post_update(...) 说明失败、原因以及“重试 / 终止 / 改派”的选择;不要静默结束。"
885        ),
886        crate::agent_core::team::completion_routing::ReviewRequiredKind::MissingCompletion => format!(
887            "这是 missing-completion 纠偏回合,不是新的用户请求。你必须对 {task_id} 采取内部动作继续推进,例如 reopen_task(...) 或 assign_task(...);仅 post_update(...) 不能算完成。"
888        ),
889    }
890}
891
892fn routing_event_still_requires_resolution_after_delivery(
893    team_orch: &crate::agent_core::team::orchestrator::TeamOrchestrator,
894    envelope: &TeamRoutingEnvelope,
895    review: Option<&crate::agent_core::team::completion_routing::ReviewAttemptMetadata>,
896) -> bool {
897    let task = match team_orch.registry.get_task(&envelope.event.task_id) {
898        Ok(Some(task)) => task,
899        Ok(None) | Err(_) => return false,
900    };
901
902    match envelope.event.kind {
903        crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskSubmitted => {
904            matches!(task.status_parsed(), TaskStatus::Submitted { .. })
905        }
906        crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskBlocked
907        | crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskMissingCompletion => {
908            let still_held = matches!(task.status_parsed(), TaskStatus::Held { .. });
909            if !still_held {
910                return false;
911            }
912            if matches!(
913                envelope.event.kind,
914                crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskBlocked
915            ) && review_terminal_post_update_recorded(team_orch, &envelope.event.task_id, review)
916            {
917                return false;
918            }
919            true
920        }
921        crate::agent_core::team::completion_routing::TeamRoutingEventKind::TaskFailed => {
922            !review_terminal_post_update_recorded(team_orch, &envelope.event.task_id, review)
923        }
924        _ => false,
925    }
926}
927
928fn review_terminal_post_update_recorded(
929    team_orch: &crate::agent_core::team::orchestrator::TeamOrchestrator,
930    task_id: &str,
931    review: Option<&crate::agent_core::team::completion_routing::ReviewAttemptMetadata>,
932) -> bool {
933    let Some(review) = review else {
934        return false;
935    };
936    team_orch
937        .session
938        .has_post_update_for_task_since(task_id, &review.first_pending_at)
939        .unwrap_or(false)
940}
941
942fn unresolved_review_failure_from_turn_result(
943    result_text: Option<&str>,
944    envelope: &TeamRoutingEnvelope,
945) -> (ReviewFailureClassification, String) {
946    match result_text {
947        None => (
948            ReviewFailureClassification::NoOp,
949            format!(
950                "lead review turn for {:?} produced no output and did not resolve the task state",
951                envelope.event.kind
952            ),
953        ),
954        Some(text) if text.trim().is_empty() => (
955            ReviewFailureClassification::NoOp,
956            format!(
957                "lead review turn for {:?} produced empty output and did not resolve the task state",
958                envelope.event.kind
959            ),
960        ),
961        Some(text) => (
962            ReviewFailureClassification::StillRequiresResolution,
963            format!(
964                "lead review turn for {:?} produced output without resolving the task state: {}",
965                envelope.event.kind,
966                truncate_for_missing_completion(text, 160)
967            ),
968        ),
969    }
970}
971
972fn next_review_retry_at(attempt_count: u32) -> String {
973    let backoff_seconds = match attempt_count {
974        0 | 1 => 5,
975        2 => 15,
976        3 => 30,
977        _ => 60,
978    };
979    (chrono::Utc::now() + chrono::Duration::seconds(backoff_seconds)).to_rfc3339()
980}
981
982fn review_attempt_diagnostic(record: &PendingRoutingRecord) -> Option<ReviewAttemptDiagnostic> {
983    let review = record.review.as_ref()?;
984    let classification = review.last_failure_classification?;
985    let reason = review.last_failure_reason.clone()?;
986    Some(ReviewAttemptDiagnostic {
987        ts: chrono::Utc::now().to_rfc3339(),
988        run_id: record.envelope.run_id.clone(),
989        team_id: record.envelope.team_id.clone(),
990        task_id: record.envelope.event.task_id.clone(),
991        event_kind: record.envelope.event.kind.clone(),
992        attempt_count: review.attempt_count,
993        classification,
994        reason,
995    })
996}
997
998#[cfg(test)]
999mod tests {
1000    use super::*;
1001    use crate::agent_core::team::completion_routing::{RoutingDeliveryStatus, TeamRoutingEvent};
1002    use crate::agent_core::team::milestone::TeamMilestoneEvent;
1003    use crate::agent_core::team::milestone_delivery::{milestone_is_public, TeamPublicUpdatesMode};
1004    use crate::agent_core::team::orchestrator::TeamOrchestrator;
1005    use crate::agent_core::team::registry::{CreateTask, TaskRegistry};
1006    use crate::agent_core::team::session::{stable_team_id_for_session_key, TeamSession};
1007    use crate::config::{ChannelsSection, GatewayConfig, LarkSection, ProgressPresentationMode};
1008    use anyhow::Result;
1009    use async_trait::async_trait;
1010    use std::sync::Arc;
1011    use std::sync::Mutex;
1012    use tempfile::tempdir;
1013    use tokio::sync::mpsc;
1014    use uuid::Uuid;
1015
1016    struct MockChannel {
1017        sent: Mutex<Vec<OutboundMsg>>,
1018        fail_when_reply_to: bool,
1019    }
1020
1021    #[async_trait]
1022    impl crate::channels_internal::Channel for MockChannel {
1023        fn name(&self) -> &str {
1024            "mock"
1025        }
1026
1027        async fn send(&self, msg: &OutboundMsg) -> Result<()> {
1028            if self.fail_when_reply_to && msg.reply_to.is_some() {
1029                anyhow::bail!("reply target not found");
1030            }
1031            self.sent.lock().unwrap().push(msg.clone());
1032            Ok(())
1033        }
1034
1035        async fn listen(&self, _tx: mpsc::Sender<InboundMsg>) -> Result<()> {
1036            Ok(())
1037        }
1038    }
1039
1040    #[test]
1041    fn routing_attempt_targets_dedupes_requester_and_fallbacks() {
1042        let requester = SessionKey::new("ws", "group:req");
1043        let lead = SessionKey::new("ws", "group:lead");
1044        let envelope = TeamRoutingEnvelope {
1045            run_id: "run-1".into(),
1046            parent_run_id: None,
1047            requester_session_key: Some(requester.clone()),
1048            fallback_session_keys: vec![requester.clone(), lead.clone(), lead.clone()],
1049            team_id: "team-1".into(),
1050            delivery_status: RoutingDeliveryStatus::NotRouted,
1051            event: TeamRoutingEvent::failed("T001", "boom"),
1052            delivery_source: None,
1053        };
1054
1055        let targets = routing_attempt_targets(&envelope);
1056        assert_eq!(targets, vec![requester, lead]);
1057    }
1058
1059    #[test]
1060    fn routing_attempt_targets_allow_fallback_only_delivery() {
1061        let lead = SessionKey::new("ws", "group:lead");
1062        let envelope = TeamRoutingEnvelope {
1063            run_id: "run-1".into(),
1064            parent_run_id: None,
1065            requester_session_key: None,
1066            fallback_session_keys: vec![lead.clone()],
1067            team_id: "team-1".into(),
1068            delivery_status: RoutingDeliveryStatus::NotRouted,
1069            event: TeamRoutingEvent::failed("T001", "boom"),
1070            delivery_source: None,
1071        };
1072
1073        let targets = routing_attempt_targets(&envelope);
1074        assert_eq!(targets, vec![lead]);
1075    }
1076
1077    #[test]
1078    fn delivery_status_uses_fallback_redirected_for_secondary_target() {
1079        assert_eq!(
1080            delivery_status_for_attempt(0, false),
1081            RoutingDeliveryStatus::DirectDelivered
1082        );
1083        assert_eq!(
1084            delivery_status_for_attempt(0, true),
1085            RoutingDeliveryStatus::QueuedDelivered
1086        );
1087        assert_eq!(
1088            delivery_status_for_attempt(1, false),
1089            RoutingDeliveryStatus::FallbackRedirected
1090        );
1091    }
1092
1093    #[test]
1094    fn stale_checkpoint_is_suppressed_after_submission() {
1095        let tmp = tempdir().unwrap();
1096        let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1097        let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1098        let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1099            Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1100        let orch = TeamOrchestrator::new(
1101            registry.clone(),
1102            session,
1103            dispatch_fn,
1104            std::time::Duration::from_secs(60),
1105        );
1106        registry
1107            .create_task(CreateTask {
1108                id: "T004".into(),
1109                title: "task".into(),
1110                assignee_hint: Some("codex-beta".into()),
1111                deps: vec![],
1112                timeout_secs: 60,
1113                spec: None,
1114                success_criteria: None,
1115            })
1116            .unwrap();
1117        registry.try_claim("T004", "codex-beta").unwrap();
1118        registry
1119            .submit_task_result("T004", "codex-beta", "done")
1120            .unwrap();
1121
1122        let envelope = TeamRoutingEnvelope {
1123            run_id: "run-1".into(),
1124            parent_run_id: None,
1125            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1126            fallback_session_keys: vec![],
1127            team_id: "team-test".into(),
1128            delivery_status: RoutingDeliveryStatus::NotRouted,
1129            event: TeamRoutingEvent::checkpoint("T004", "codex-beta", "still working"),
1130            delivery_source: None,
1131        };
1132
1133        assert!(routing_event_is_stale_for_delivery(
1134            orch.as_ref(),
1135            &envelope
1136        ));
1137    }
1138
1139    #[test]
1140    fn submitted_delivery_requires_explicit_resolution_until_status_changes() {
1141        let tmp = tempdir().unwrap();
1142        let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1143        let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1144        let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1145            Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1146        let orch = TeamOrchestrator::new(
1147            registry.clone(),
1148            session,
1149            dispatch_fn,
1150            std::time::Duration::from_secs(60),
1151        );
1152        registry
1153            .create_task(CreateTask {
1154                id: "T006".into(),
1155                title: "task".into(),
1156                assignee_hint: Some("codex-beta".into()),
1157                deps: vec![],
1158                timeout_secs: 60,
1159                spec: None,
1160                success_criteria: None,
1161            })
1162            .unwrap();
1163        registry.try_claim("T006", "codex-beta").unwrap();
1164        registry
1165            .submit_task_result("T006", "codex-beta", "done")
1166            .unwrap();
1167
1168        let envelope = TeamRoutingEnvelope {
1169            run_id: "run-6".into(),
1170            parent_run_id: None,
1171            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1172            fallback_session_keys: vec![],
1173            team_id: "team-test".into(),
1174            delivery_status: RoutingDeliveryStatus::NotRouted,
1175            event: TeamRoutingEvent::submitted("T006", "codex-beta", "done"),
1176            delivery_source: None,
1177        };
1178
1179        assert!(routing_event_still_requires_resolution_after_delivery(
1180            orch.as_ref(),
1181            &envelope,
1182            None
1183        ));
1184
1185        registry.accept_task("T006", "lead").unwrap();
1186
1187        assert!(!routing_event_still_requires_resolution_after_delivery(
1188            orch.as_ref(),
1189            &envelope,
1190            None
1191        ));
1192    }
1193
1194    #[test]
1195    fn blocked_and_missing_completion_delivery_require_explicit_resolution_while_task_is_held() {
1196        let tmp = tempdir().unwrap();
1197        let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1198        let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1199        let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1200            Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1201        let orch = TeamOrchestrator::new(
1202            registry.clone(),
1203            session,
1204            dispatch_fn,
1205            std::time::Duration::from_secs(60),
1206        );
1207        registry
1208            .create_task(CreateTask {
1209                id: "T007".into(),
1210                title: "task".into(),
1211                assignee_hint: Some("codex-beta".into()),
1212                deps: vec![],
1213                timeout_secs: 60,
1214                spec: None,
1215                success_criteria: None,
1216            })
1217            .unwrap();
1218        registry.try_claim("T007", "codex-beta").unwrap();
1219        registry
1220            .hold_claim("T007", "codex-beta", "missing_completion")
1221            .unwrap();
1222
1223        let blocked = TeamRoutingEnvelope {
1224            run_id: "run-7b".into(),
1225            parent_run_id: None,
1226            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1227            fallback_session_keys: vec![],
1228            team_id: "team-test".into(),
1229            delivery_status: RoutingDeliveryStatus::NotRouted,
1230            event: TeamRoutingEvent::blocked("T007", "codex-beta", "blocked"),
1231            delivery_source: None,
1232        };
1233        let missing = TeamRoutingEnvelope {
1234            run_id: "run-7m".into(),
1235            parent_run_id: None,
1236            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1237            fallback_session_keys: vec![],
1238            team_id: "team-test".into(),
1239            delivery_status: RoutingDeliveryStatus::NotRouted,
1240            event: TeamRoutingEvent::missing_completion("T007", "codex-beta"),
1241            delivery_source: None,
1242        };
1243
1244        assert!(routing_event_still_requires_resolution_after_delivery(
1245            orch.as_ref(),
1246            &blocked,
1247            None
1248        ));
1249        assert!(routing_event_still_requires_resolution_after_delivery(
1250            orch.as_ref(),
1251            &missing,
1252            None
1253        ));
1254
1255        registry.reassign_task("T007", "worker").unwrap();
1256
1257        assert!(!routing_event_still_requires_resolution_after_delivery(
1258            orch.as_ref(),
1259            &blocked,
1260            None
1261        ));
1262        assert!(!routing_event_still_requires_resolution_after_delivery(
1263            orch.as_ref(),
1264            &missing,
1265            None
1266        ));
1267    }
1268
1269    #[test]
1270    fn unresolved_review_failure_classification_distinguishes_noop_from_textful_turn() {
1271        let envelope = TeamRoutingEnvelope {
1272            run_id: "run-review".into(),
1273            parent_run_id: None,
1274            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1275            fallback_session_keys: vec![],
1276            team_id: "team-test".into(),
1277            delivery_status: RoutingDeliveryStatus::NotRouted,
1278            event: TeamRoutingEvent::submitted("T008", "codex-beta", "done"),
1279            delivery_source: None,
1280        };
1281
1282        let (classification, reason) = unresolved_review_failure_from_turn_result(None, &envelope);
1283        assert_eq!(classification, ReviewFailureClassification::NoOp);
1284        assert!(reason.contains("produced no output"));
1285
1286        let (classification, reason) = unresolved_review_failure_from_turn_result(
1287            Some("I reviewed it but won't accept yet"),
1288            &envelope,
1289        );
1290        assert_eq!(
1291            classification,
1292            ReviewFailureClassification::StillRequiresResolution
1293        );
1294        assert!(reason.contains("produced output without resolving"));
1295    }
1296
1297    #[test]
1298    fn render_review_retry_delivery_text_includes_previous_failure_context() {
1299        let envelope = TeamRoutingEnvelope {
1300            run_id: "run-review".into(),
1301            parent_run_id: None,
1302            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1303            fallback_session_keys: vec![],
1304            team_id: "team-test".into(),
1305            delivery_status: RoutingDeliveryStatus::PersistedPending,
1306            event: TeamRoutingEvent::submitted("T010", "codex-beta", "done"),
1307            delivery_source: None,
1308        };
1309        let record = PendingRoutingRecord::from_envelope(envelope).note_failed_attempt(
1310            ReviewFailureClassification::StillRequiresResolution,
1311            "lead review turn completed without accept_task/reopen_task",
1312            None,
1313        );
1314
1315        let rendered = render_routing_event_for_delivery(&record);
1316        assert!(rendered.contains("[系统纠偏提醒]"));
1317        assert!(rendered.contains("此前同一控制面事件已失败 1 次"));
1318        assert!(rendered.contains("StillRequiresResolution"));
1319        assert!(rendered.contains("accept_task(task_id=\"T010\")"));
1320        assert!(rendered.contains("reopen_task(task_id=\"T010\", reason=\"...\")"));
1321    }
1322
1323    #[test]
1324    fn failed_and_blocked_reviews_can_resolve_via_post_update_after_user_notification() {
1325        let tmp = tempdir().unwrap();
1326        let registry = Arc::new(TaskRegistry::new_in_memory().unwrap());
1327        let session = Arc::new(TeamSession::from_dir("team-test", tmp.path().to_path_buf()));
1328        let dispatch_fn: crate::agent_core::team::heartbeat::DispatchFn =
1329            Arc::new(|_agent, _task| Box::pin(async { Ok(()) }));
1330        let orch = TeamOrchestrator::new(
1331            registry.clone(),
1332            session,
1333            dispatch_fn,
1334            std::time::Duration::from_secs(60),
1335        );
1336        orch.set_lead_agent_name("claude-alpha".into());
1337
1338        registry
1339            .create_task(CreateTask {
1340                id: "T009".into(),
1341                title: "failed-task".into(),
1342                assignee_hint: Some("codex-beta".into()),
1343                deps: vec![],
1344                timeout_secs: 60,
1345                spec: None,
1346                success_criteria: None,
1347            })
1348            .unwrap();
1349        registry.mark_failed("T009", "quota").unwrap();
1350
1351        let failed = TeamRoutingEnvelope {
1352            run_id: "run-9f".into(),
1353            parent_run_id: None,
1354            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1355            fallback_session_keys: vec![],
1356            team_id: "team-test".into(),
1357            delivery_status: RoutingDeliveryStatus::NotRouted,
1358            event: TeamRoutingEvent::failed("T009", "quota"),
1359            delivery_source: None,
1360        };
1361        let failed_review =
1362            crate::agent_core::team::completion_routing::PendingRoutingRecord::from_envelope(
1363                failed.clone(),
1364            );
1365        assert!(routing_event_still_requires_resolution_after_delivery(
1366            orch.as_ref(),
1367            &failed,
1368            failed_review.review.as_ref(),
1369        ));
1370        orch.begin_lead_review_attempt("run-9f", "T009", failed.event.kind.clone());
1371        assert!(orch.post_message("任务失败,是否重试请用户决定"));
1372        orch.end_lead_review_attempt("run-9f");
1373        assert!(!routing_event_still_requires_resolution_after_delivery(
1374            orch.as_ref(),
1375            &failed,
1376            failed_review.review.as_ref(),
1377        ));
1378
1379        registry
1380            .create_task(CreateTask {
1381                id: "T010".into(),
1382                title: "blocked-task".into(),
1383                assignee_hint: Some("codex-beta".into()),
1384                deps: vec![],
1385                timeout_secs: 60,
1386                spec: None,
1387                success_criteria: None,
1388            })
1389            .unwrap();
1390        registry.try_claim("T010", "codex-beta").unwrap();
1391        registry
1392            .hold_claim("T010", "codex-beta", "waiting_user")
1393            .unwrap();
1394
1395        let blocked = TeamRoutingEnvelope {
1396            run_id: "run-10b".into(),
1397            parent_run_id: None,
1398            requester_session_key: Some(SessionKey::new("lark", "group:test")),
1399            fallback_session_keys: vec![],
1400            team_id: "team-test".into(),
1401            delivery_status: RoutingDeliveryStatus::NotRouted,
1402            event: TeamRoutingEvent::blocked("T010", "codex-beta", "waiting_user"),
1403            delivery_source: None,
1404        };
1405        let blocked_review =
1406            crate::agent_core::team::completion_routing::PendingRoutingRecord::from_envelope(
1407                blocked.clone(),
1408            );
1409        assert!(routing_event_still_requires_resolution_after_delivery(
1410            orch.as_ref(),
1411            &blocked,
1412            blocked_review.review.as_ref(),
1413        ));
1414        orch.begin_lead_review_attempt("run-10b", "T010", blocked.event.kind.clone());
1415        assert!(orch.post_message("任务阻塞,需要用户决定下一步"));
1416        orch.end_lead_review_attempt("run-10b");
1417        assert!(!routing_event_still_requires_resolution_after_delivery(
1418            orch.as_ref(),
1419            &blocked,
1420            blocked_review.review.as_ref(),
1421        ));
1422    }
1423
1424    #[tokio::test]
1425    async fn capture_specialist_reply_text_prefers_direct_result() {
1426        let dir = tempdir().unwrap();
1427        let storage = crate::session::SessionStorage::new(dir.path().to_path_buf());
1428        let session_id = Uuid::new_v4();
1429
1430        let captured = capture_specialist_reply_text(
1431            &storage,
1432            session_id,
1433            &Ok(Some("direct result".to_string())),
1434        )
1435        .await
1436        .unwrap();
1437
1438        assert_eq!(captured.as_deref(), Some("direct result"));
1439    }
1440
1441    #[tokio::test]
1442    async fn capture_specialist_reply_text_falls_back_to_persisted_assistant_message() {
1443        let dir = tempdir().unwrap();
1444        let storage = crate::session::SessionStorage::new(dir.path().to_path_buf());
1445        let session_id = Uuid::new_v4();
1446        let session_dir = dir.path().join(session_id.to_string());
1447        std::fs::create_dir_all(&session_dir).unwrap();
1448        std::fs::write(
1449            session_dir.join("messages.jsonl"),
1450            format!(
1451                "{}\n{}\n",
1452                serde_json::json!({
1453                    "id": Uuid::new_v4(),
1454                    "role": "user",
1455                    "content": "spec",
1456                    "timestamp": chrono::Utc::now(),
1457                    "sender": "orchestrator",
1458                    "tool_calls": null,
1459                    "fragment_event_ids": null,
1460                    "aggregation_mode": null,
1461                }),
1462                serde_json::json!({
1463                    "id": Uuid::new_v4(),
1464                    "role": "assistant",
1465                    "content": "persisted assistant output",
1466                    "timestamp": chrono::Utc::now(),
1467                    "sender": "@codex-beta",
1468                    "tool_calls": null,
1469                    "fragment_event_ids": null,
1470                    "aggregation_mode": null,
1471                })
1472            ),
1473        )
1474        .unwrap();
1475
1476        let captured = capture_specialist_reply_text(&storage, session_id, &Ok(None))
1477            .await
1478            .unwrap();
1479
1480        assert_eq!(captured.as_deref(), Some("persisted assistant output"));
1481    }
1482
1483    #[test]
1484    fn missing_completion_excerpt_falls_back_to_runtime_error() {
1485        let excerpt =
1486            missing_completion_excerpt(&Err(anyhow::anyhow!("tool bridge unavailable")), None)
1487                .unwrap();
1488        assert!(excerpt.contains("runtime error: tool bridge unavailable"));
1489    }
1490
1491    #[test]
1492    fn missing_completion_excerpt_zero_output_ok_none() {
1493        let excerpt = missing_completion_excerpt(&Ok(None), None).unwrap();
1494        assert!(
1495            excerpt.contains("zero-output turn"),
1496            "Ok(None) should produce zero-output diagnostic, got: {excerpt}"
1497        );
1498        assert!(excerpt.contains("cold-start"));
1499    }
1500
1501    #[test]
1502    fn missing_completion_excerpt_zero_output_ok_empty_string() {
1503        let excerpt = missing_completion_excerpt(&Ok(Some(String::new())), None).unwrap();
1504        assert!(
1505            excerpt.contains("zero-output turn"),
1506            "Ok(Some(\"\")) should produce zero-output diagnostic, got: {excerpt}"
1507        );
1508    }
1509
1510    #[test]
1511    fn missing_completion_excerpt_nonempty_result_without_captured_returns_none() {
1512        // If capture_specialist_reply_text errored and captured_reply_text is None,
1513        // but result has actual text, we should NOT emit a misleading zero-output diagnostic.
1514        let excerpt = missing_completion_excerpt(&Ok(Some("actual output".to_string())), None);
1515        assert!(
1516            excerpt.is_none(),
1517            "Non-empty Ok(Some) without captured text should yield None, got: {excerpt:?}"
1518        );
1519    }
1520
1521    #[test]
1522    fn persist_missing_completion_reply_artifacts_writes_draft_result() {
1523        let tmp = tempdir().unwrap();
1524        let session = TeamSession::from_dir("team-test", tmp.path().to_path_buf());
1525
1526        persist_missing_completion_reply_artifacts(
1527            &session,
1528            "T005",
1529            "codex-beta",
1530            "raw specialist answer",
1531        )
1532        .unwrap();
1533
1534        let result = std::fs::read_to_string(session.task_dir("T005").join("result.md")).unwrap();
1535        let progress =
1536            std::fs::read_to_string(session.task_dir("T005").join("progress.md")).unwrap();
1537        assert!(result.contains("Draft Specialist Output"));
1538        assert!(result.contains("raw specialist answer"));
1539        assert!(progress.contains("preserved raw reply text"));
1540    }
1541
1542    #[test]
1543    fn public_milestone_visibility_respects_mode() {
1544        assert!(milestone_is_public(
1545            &TeamMilestoneEvent::LeadMessage {
1546                text: "hello".into()
1547            },
1548            TeamPublicUpdatesMode::Minimal
1549        ));
1550        assert!(!milestone_is_public(
1551            &TeamMilestoneEvent::AllTasksDone,
1552            TeamPublicUpdatesMode::Minimal
1553        ));
1554        assert!(milestone_is_public(
1555            &TeamMilestoneEvent::AllTasksDone,
1556            TeamPublicUpdatesMode::Normal
1557        ));
1558        assert!(!milestone_is_public(
1559            &TeamMilestoneEvent::TaskDone {
1560                task_id: "T1".into(),
1561                task_title: "task".into(),
1562                agent: "worker".into(),
1563                done_count: 1,
1564                total: 1,
1565            },
1566            TeamPublicUpdatesMode::Normal
1567        ));
1568        assert!(milestone_is_public(
1569            &TeamMilestoneEvent::TaskFailed {
1570                task_id: "T1".into(),
1571                agent: "worker".into(),
1572                reason: "boom".into(),
1573            },
1574            TeamPublicUpdatesMode::Normal
1575        ));
1576    }
1577
1578    #[tokio::test]
1579    async fn milestone_send_retries_without_reply_to() {
1580        let channel = Arc::new(MockChannel {
1581            sent: Mutex::new(Vec::new()),
1582            fail_when_reply_to: true,
1583        });
1584        let outbound = OutboundMsg {
1585            session_key: SessionKey::new("lark", "group:test"),
1586            content: MsgContent::text("hello"),
1587            reply_to: Some("reply-id".into()),
1588            thread_ts: None,
1589        };
1590        let (sent, result) = send_with_reply_fallback(
1591            &(channel.clone() as Arc<dyn crate::channels_internal::Channel>),
1592            outbound,
1593        )
1594        .await;
1595        result.unwrap();
1596        assert!(sent.reply_to.is_none());
1597        assert_eq!(channel.sent.lock().unwrap().len(), 1);
1598    }
1599
1600    #[test]
1601    fn dm_team_identity_uses_default_lark_instance() {
1602        let cfg = GatewayConfig {
1603            channels: ChannelsSection {
1604                lark: Some(LarkSection {
1605                    enabled: true,
1606                    presentation: ProgressPresentationMode::FinalOnly,
1607                    trigger_policy: None,
1608                    default_instance: Some("default".into()),
1609                    instances: vec![],
1610                }),
1611                ..Default::default()
1612            },
1613            ..Default::default()
1614        };
1615
1616        let lead_instance =
1617            default_channel_instance_for_scope(&cfg, "lark", "user:ou_test").expect("instance");
1618        let registered = SessionKey::with_instance("lark", &lead_instance, "user:ou_test");
1619        let inbound = SessionKey::with_instance("lark", "default", "user:ou_test");
1620
1621        assert_eq!(
1622            stable_team_id_for_session_key(&registered),
1623            stable_team_id_for_session_key(&inbound)
1624        );
1625    }
1626
1627    #[test]
1628    fn group_team_identity_does_not_force_default_lark_instance() {
1629        let cfg = GatewayConfig {
1630            channels: ChannelsSection {
1631                lark: Some(LarkSection {
1632                    enabled: true,
1633                    presentation: ProgressPresentationMode::FinalOnly,
1634                    trigger_policy: None,
1635                    default_instance: Some("default".into()),
1636                    instances: vec![],
1637                }),
1638                ..Default::default()
1639            },
1640            ..Default::default()
1641        };
1642
1643        assert_eq!(
1644            default_channel_instance_for_scope(&cfg, "lark", "group:oc_test"),
1645            None
1646        );
1647    }
1648}