1use super::*;
2
3impl LifecycleManager {
4 pub(super) async fn terminate(
12 &self,
13 session: &mut Session,
14 reason: TerminationReason,
15 ) -> Result<()> {
16 let terminal_status = match reason {
20 TerminationReason::RuntimeGone
21 | TerminationReason::AgentExited
22 | TerminationReason::NoHandle => SessionStatus::Killed,
23 };
24 if session.status != terminal_status {
25 self.transition(session, terminal_status).await?;
26 }
27 if let Some(engine) = self.reaction_engine.as_ref() {
28 engine.clear_all_for_session(&session.id);
29 }
30 self.idle_since
33 .lock()
34 .unwrap_or_else(|e| {
35 tracing::error!("lifecycle idle_since mutex poisoned; recovering inner state: {e}");
36 e.into_inner()
37 })
38 .remove(&session.id);
39 self.last_review_backlog_check
40 .lock()
41 .unwrap_or_else(|e| {
42 tracing::error!(
43 "last_review_backlog_check mutex poisoned; recovering inner state: {e}"
44 );
45 e.into_inner()
46 })
47 .remove(&session.id);
48 self.emit(OrchestratorEvent::Terminated {
49 id: session.id.clone(),
50 reason,
51 });
52 Ok(())
53 }
54
55 pub async fn transition(&self, session: &mut Session, to: SessionStatus) -> Result<()> {
72 if session.status == to {
73 return Ok(());
74 }
75 let from = session.status;
76 session.status = to;
77
78 match self.agent.cost_estimate(session).await {
86 Ok(Some(cost)) => {
87 let sid = session.id.0.clone();
89 let pid = session.project_id.clone();
90 let br = session.branch.clone();
91 let c = cost.clone();
92 let ca = session.created_at;
93 let ledger_result = tokio::task::spawn_blocking(move || {
94 crate::cost_ledger::record_cost(&sid, &pid, &br, &c, ca)
95 })
96 .await;
97 match ledger_result {
98 Ok(Err(e)) => {
99 tracing::warn!(session = %session.id, "cost ledger write failed: {e}");
100 }
101 Err(e) => {
102 tracing::warn!(session = %session.id, "cost ledger task panicked: {e}");
103 }
104 Ok(Ok(())) => {}
105 }
106 session.cost = Some(cost);
107 }
108 Ok(None) => {}
109 Err(e) => {
110 tracing::debug!(session = %session.id, "cost_estimate failed: {e}");
111 }
112 }
113
114 let mut persisted_to = to;
124 if let Some(engine) = self.reaction_engine.as_ref() {
125 if let Some(next_key) = status_to_reaction_key(to) {
126 match engine.dispatch(session, next_key).await {
127 Ok(Some(outcome))
128 if should_park_in_merge_failed(engine, session, to, next_key, &outcome) =>
129 {
130 persisted_to = SessionStatus::MergeFailed;
131 session.status = persisted_to;
132 }
133 Ok(_) => {}
134 Err(e) => {
135 tracing::warn!(
136 session = %session.id,
137 reaction = next_key,
138 error = %e,
139 "reaction dispatch failed; lifecycle loop continues"
140 );
141 }
142 }
143 }
144 }
145
146 if persisted_to == from {
150 session.status = from;
151 return Ok(());
152 }
153
154 self.sessions.save(session).await?;
155 self.emit(OrchestratorEvent::StatusChanged {
156 id: session.id.clone(),
157 from,
158 to: persisted_to,
159 });
160
161 if is_orchestrator_notifiable(persisted_to) {
166 self.notify_orchestrator(session, persisted_to).await;
167 }
168
169 if let Some(engine) = self.reaction_engine.as_ref() {
170 clear_tracker_on_transition(engine, &session.id, from, persisted_to);
176 }
177
178 Ok(())
179 }
180
181 pub(super) async fn notify_orchestrator(&self, worker: &Session, to: SessionStatus) {
187 let Some(orch_id) = worker.spawned_by.as_ref() else {
188 return;
189 };
190 let parent = match self.sessions.find_by_prefix(&orch_id.0).await {
191 Ok(p) => p,
192 Err(e) => {
193 tracing::debug!(
194 session = %worker.id,
195 parent = %orch_id.0,
196 "orchestrator session lookup failed: {e}"
197 );
198 return;
199 }
200 };
201 let Some(handle) = parent.runtime_handle.as_deref() else {
202 return;
203 };
204 let msg = format_orchestrator_notification(worker, to);
205 if let Err(e) = self.runtime.send_message(handle, &msg).await {
206 tracing::warn!(
207 session = %worker.id,
208 parent = %parent.id,
209 "failed to deliver orchestrator notification: {e}"
210 );
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::lifecycle::tests::{
219 build_engine_with_ci_failed, fake_pr, fake_session, recv_timeout, MockAgent, MockRuntime,
220 MockScm,
221 };
222 use crate::reactions::ReactionAction;
223 use std::collections::HashSet;
224
225 #[tokio::test]
228 async fn transition_into_ci_failed_dispatches_reaction_on_shared_channel() {
229 use crate::lifecycle::tests::unique_temp_dir;
233 use crate::scm::{CiStatus, PrState, ReviewDecision};
234 use crate::session_manager::SessionManager;
235 let base = unique_temp_dir("reaction-transition");
236 let sessions = Arc::new(SessionManager::new(base.clone()));
237 let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
238 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
239 let scm = Arc::new(MockScm::new());
240
241 let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
242 let engine = build_engine_with_ci_failed(&lifecycle, "fix CI please");
243 let lifecycle = Arc::new(
244 lifecycle
245 .with_reaction_engine(engine.clone())
246 .with_scm(scm.clone() as Arc<dyn Scm>),
247 );
248
249 let mut rx = lifecycle.subscribe();
250
251 let mut s = fake_session("s1", "demo");
252 s.status = SessionStatus::Working;
253 sessions.save(&s).await.unwrap();
254
255 scm.set_pr(Some(fake_pr(7, "ao-s1")));
256 scm.set_state(PrState::Open);
257 scm.set_ci(CiStatus::Failing);
258 scm.set_review(ReviewDecision::None);
259
260 let mut seen = HashSet::new();
261 lifecycle.tick(&mut seen).await.unwrap();
262
263 let mut events = Vec::new();
264 while let Some(e) = recv_timeout(&mut rx).await {
265 events.push(e);
266 }
267
268 assert!(
269 events.iter().any(|e| matches!(
270 e,
271 OrchestratorEvent::StatusChanged {
272 to: SessionStatus::CiFailed,
273 ..
274 }
275 )),
276 "expected StatusChanged to CiFailed, got {events:?}"
277 );
278 assert!(
279 events.iter().any(|e| matches!(
280 e,
281 OrchestratorEvent::ReactionTriggered {
282 action: ReactionAction::SendToAgent,
283 ..
284 }
285 )),
286 "expected ReactionTriggered(SendToAgent) from engine, got {events:?}"
287 );
288
289 assert_eq!(engine.attempts(&s.id, "ci-failed"), 1);
290
291 let _ = std::fs::remove_dir_all(&base);
292 }
293
294 #[tokio::test]
295 async fn leaving_reaction_status_clears_tracker() {
296 use crate::lifecycle::tests::unique_temp_dir;
297 use crate::scm::{CiStatus, PrState, ReviewDecision};
298 use crate::session_manager::SessionManager;
299 let base = unique_temp_dir("reaction-clear");
300 let sessions = Arc::new(SessionManager::new(base.clone()));
301 let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
302 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
303 let scm = Arc::new(MockScm::new());
304
305 let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
306 let engine = build_engine_with_ci_failed(&lifecycle, "fix");
307 let lifecycle = Arc::new(
308 lifecycle
309 .with_reaction_engine(engine.clone())
310 .with_scm(scm.clone() as Arc<dyn Scm>),
311 );
312
313 let mut s = fake_session("s1", "demo");
314 s.status = SessionStatus::Working;
315 sessions.save(&s).await.unwrap();
316
317 scm.set_pr(Some(fake_pr(8, "ao-s1")));
318 scm.set_state(PrState::Open);
319 scm.set_ci(CiStatus::Failing);
320 scm.set_review(ReviewDecision::None);
321 let mut seen = HashSet::new();
322 lifecycle.tick(&mut seen).await.unwrap();
323 assert_eq!(engine.attempts(&s.id, "ci-failed"), 1);
324
325 let s_updated = sessions.find_by_prefix("s1").await.unwrap();
326 assert_eq!(s_updated.status, SessionStatus::CiFailed);
327
328 let mut s2 = s_updated;
329 lifecycle
330 .transition(&mut s2, SessionStatus::PrOpen)
331 .await
332 .unwrap();
333 assert_eq!(
334 engine.attempts(&s2.id, "ci-failed"),
335 0,
336 "tracker should be cleared on exit from CiFailed"
337 );
338
339 let _ = std::fs::remove_dir_all(&base);
340 }
341
342 #[tokio::test]
343 async fn unrelated_transition_does_not_touch_reaction_engine() {
344 use crate::lifecycle::tests::unique_temp_dir;
345 use crate::session_manager::SessionManager;
346 let base = unique_temp_dir("no-react");
347 let sessions = Arc::new(SessionManager::new(base.clone()));
348 let runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
349 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
350
351 let lifecycle = LifecycleManager::new(sessions.clone(), runtime, agent);
352 let engine = build_engine_with_ci_failed(&lifecycle, "never fires");
353 let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine.clone()));
354
355 let mut rx = lifecycle.subscribe();
356 sessions.save(&fake_session("s1", "demo")).await.unwrap();
357 let mut seen = HashSet::new();
358 lifecycle.tick(&mut seen).await.unwrap();
359
360 let mut events = Vec::new();
361 while let Some(e) = recv_timeout(&mut rx).await {
362 events.push(e);
363 }
364
365 assert!(events.iter().any(|e| matches!(
366 e,
367 OrchestratorEvent::StatusChanged {
368 to: SessionStatus::Working,
369 ..
370 }
371 )));
372 assert!(
373 !events
374 .iter()
375 .any(|e| matches!(e, OrchestratorEvent::ReactionTriggered { .. })),
376 "unexpected ReactionTriggered on Working transition: {events:?}"
377 );
378
379 let _ = std::fs::remove_dir_all(&base);
380 }
381
382 #[tokio::test]
385 async fn transition_notifies_parent_orchestrator_via_runtime() {
386 use crate::lifecycle::tests::unique_temp_dir;
387 use crate::session_manager::SessionManager;
388 let base = unique_temp_dir("orchestrator-notify");
389 let sessions = Arc::new(SessionManager::new(base.clone()));
390 let runtime = Arc::new(MockRuntime::new(true));
391 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
392 let lifecycle = Arc::new(LifecycleManager::new(
393 sessions.clone(),
394 runtime.clone() as Arc<dyn Runtime>,
395 agent,
396 ));
397
398 let mut parent = fake_session("orch1", "demo");
399 parent.runtime_handle = Some("orch-handle".into());
400 sessions.save(&parent).await.unwrap();
401
402 let mut worker = fake_session("work1", "demo");
403 worker.status = SessionStatus::Working;
404 worker.spawned_by = Some(parent.id.clone());
405 sessions.save(&worker).await.unwrap();
406
407 lifecycle
408 .transition(&mut worker, SessionStatus::PrOpen)
409 .await
410 .unwrap();
411
412 let sends = runtime.sends();
413 assert_eq!(
414 sends.len(),
415 1,
416 "expected one notification to parent, got {sends:?}"
417 );
418 assert_eq!(sends[0].0, "orch-handle");
419 assert!(
420 sends[0].1.contains("pr_open"),
421 "message should mention new status, got {:?}",
422 sends[0].1
423 );
424
425 let _ = std::fs::remove_dir_all(&base);
426 }
427
428 #[tokio::test]
429 async fn transition_without_spawned_by_sends_no_message() {
430 use crate::lifecycle::tests::unique_temp_dir;
431 use crate::session_manager::SessionManager;
432 let base = unique_temp_dir("orchestrator-notify-none");
433 let sessions = Arc::new(SessionManager::new(base.clone()));
434 let runtime = Arc::new(MockRuntime::new(true));
435 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
436 let lifecycle = Arc::new(LifecycleManager::new(
437 sessions.clone(),
438 runtime.clone() as Arc<dyn Runtime>,
439 agent,
440 ));
441
442 let mut worker = fake_session("lone1", "demo");
443 worker.status = SessionStatus::Working;
444 assert!(worker.spawned_by.is_none());
445 sessions.save(&worker).await.unwrap();
446
447 lifecycle
448 .transition(&mut worker, SessionStatus::PrOpen)
449 .await
450 .unwrap();
451
452 assert!(
453 runtime.sends().is_empty(),
454 "workers without spawned_by must not trigger a send"
455 );
456
457 let _ = std::fs::remove_dir_all(&base);
458 }
459
460 #[tokio::test]
461 async fn transition_into_non_notifiable_status_sends_no_message() {
462 use crate::lifecycle::tests::unique_temp_dir;
463 use crate::session_manager::SessionManager;
464 let base = unique_temp_dir("orchestrator-notify-filter");
465 let sessions = Arc::new(SessionManager::new(base.clone()));
466 let runtime = Arc::new(MockRuntime::new(true));
467 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
468 let lifecycle = Arc::new(LifecycleManager::new(
469 sessions.clone(),
470 runtime.clone() as Arc<dyn Runtime>,
471 agent,
472 ));
473
474 let mut parent = fake_session("orch2", "demo");
475 parent.runtime_handle = Some("orch-handle".into());
476 sessions.save(&parent).await.unwrap();
477
478 let mut worker = fake_session("work2", "demo");
479 worker.status = SessionStatus::Spawning;
480 worker.spawned_by = Some(parent.id.clone());
481 sessions.save(&worker).await.unwrap();
482
483 lifecycle
484 .transition(&mut worker, SessionStatus::Working)
485 .await
486 .unwrap();
487
488 assert!(
489 runtime.sends().is_empty(),
490 "transition to Working should not notify orchestrator"
491 );
492
493 let _ = std::fs::remove_dir_all(&base);
494 }
495
496 #[tokio::test]
499 async fn all_complete_fires_once_when_last_session_terminates() {
500 use crate::lifecycle::tests::unique_temp_dir;
501 use crate::reactions::ReactionConfig;
502 use crate::session_manager::SessionManager;
503 let base = unique_temp_dir("all-complete");
504 let sessions = Arc::new(SessionManager::new(base.clone()));
505 let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
506 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
507
508 let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
509 let engine_runtime = Arc::new(MockRuntime::new(true));
510 let cfg = ReactionConfig::new(ReactionAction::Notify);
511 let mut map = std::collections::HashMap::new();
512 map.insert("all-complete".into(), cfg);
513 let engine = Arc::new(ReactionEngine::new(
514 map,
515 engine_runtime.clone() as Arc<dyn Runtime>,
516 lifecycle.events_sender(),
517 ));
518 let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine.clone()));
519
520 let mut rx = lifecycle.subscribe();
521
522 let mut s = fake_session("s1", "demo");
523 s.status = SessionStatus::Done;
524 sessions.save(&s).await.unwrap();
525
526 let mut seen = HashSet::new();
527 lifecycle.tick(&mut seen).await.unwrap();
528
529 let mut events = Vec::new();
530 while let Some(e) = recv_timeout(&mut rx).await {
531 events.push(e);
532 }
533
534 assert!(
535 events.iter().any(|e| matches!(
536 e,
537 OrchestratorEvent::ReactionTriggered {
538 reaction_key,
539 ..
540 } if reaction_key == "all-complete"
541 )),
542 "expected all-complete ReactionTriggered, got {events:?}"
543 );
544
545 lifecycle.tick(&mut seen).await.unwrap();
546 let mut events2 = Vec::new();
547 while let Some(e) = recv_timeout(&mut rx).await {
548 events2.push(e);
549 }
550 assert!(
551 !events2.iter().any(|e| matches!(
552 e,
553 OrchestratorEvent::ReactionTriggered {
554 reaction_key,
555 ..
556 } if reaction_key == "all-complete"
557 )),
558 "all-complete must NOT re-fire on second tick: {events2:?}"
559 );
560
561 let _ = std::fs::remove_dir_all(&base);
562 }
563
564 #[tokio::test]
565 async fn all_complete_resets_on_new_session() {
566 use crate::lifecycle::tests::unique_temp_dir;
567 use crate::reactions::ReactionConfig;
568 use crate::session_manager::SessionManager;
569 use std::sync::atomic::Ordering;
570 let base = unique_temp_dir("all-complete-reset");
571 let sessions = Arc::new(SessionManager::new(base.clone()));
572 let lifecycle_runtime: Arc<dyn Runtime> = Arc::new(MockRuntime::new(true));
573 let agent: Arc<dyn Agent> = Arc::new(MockAgent::new(ActivityState::Ready));
574
575 let lifecycle = LifecycleManager::new(sessions.clone(), lifecycle_runtime, agent);
576 let engine_runtime = Arc::new(MockRuntime::new(true));
577 let mut map = std::collections::HashMap::new();
578 map.insert(
579 "all-complete".into(),
580 ReactionConfig::new(ReactionAction::Notify),
581 );
582 let engine = Arc::new(ReactionEngine::new(
583 map,
584 engine_runtime.clone() as Arc<dyn Runtime>,
585 lifecycle.events_sender(),
586 ));
587 let lifecycle = Arc::new(lifecycle.with_reaction_engine(engine.clone()));
588 let mut rx = lifecycle.subscribe();
589
590 let mut s1 = fake_session("s1", "demo");
591 s1.status = SessionStatus::Done;
592 sessions.save(&s1).await.unwrap();
593 let mut seen = HashSet::new();
594 lifecycle.tick(&mut seen).await.unwrap();
595 while recv_timeout(&mut rx).await.is_some() {}
596
597 let s2 = fake_session("s2", "demo");
598 sessions.save(&s2).await.unwrap();
599 lifecycle.tick(&mut seen).await.unwrap();
600 while recv_timeout(&mut rx).await.is_some() {}
601 assert!(
602 !lifecycle.all_complete_fired.load(Ordering::Relaxed),
603 "flag must be reset when a non-terminal session appears"
604 );
605
606 let mut s2_done = sessions.find_by_prefix("s2").await.unwrap();
607 s2_done.status = SessionStatus::Done;
608 sessions.save(&s2_done).await.unwrap();
609 lifecycle.tick(&mut seen).await.unwrap();
610 let mut events3 = Vec::new();
611 while let Some(e) = recv_timeout(&mut rx).await {
612 events3.push(e);
613 }
614 assert!(
615 events3.iter().any(|e| matches!(
616 e,
617 OrchestratorEvent::ReactionTriggered {
618 reaction_key,
619 ..
620 } if reaction_key == "all-complete"
621 )),
622 "all-complete must re-fire after a new drain: {events3:?}"
623 );
624
625 let _ = std::fs::remove_dir_all(&base);
626 }
627}