1use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::sync::Arc;
19
20use crate::error::Result;
21use crate::typed_id::SessionId;
22
23#[cfg(feature = "openapi")]
24use utoipa::ToSchema;
25
26pub type TaskProgress = crate::background::BackgroundProgress;
28
29pub const TASK_KIND_SUBAGENT: &str = "subagent";
32pub const TASK_KIND_EXTERNAL_AGENT: &str = "external_agent";
33pub const TASK_KIND_BACKGROUND_TOOL: &str = "background_tool";
34pub const TASK_KIND_MONITOR: &str = "monitor";
37
38pub fn generate_task_id() -> String {
40 format!("task_{}", uuid::Uuid::now_v7().simple())
41}
42
43pub fn generate_task_message_id() -> String {
45 format!("tmsg_{}", uuid::Uuid::now_v7().simple())
46}
47
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
54#[cfg_attr(feature = "openapi", derive(ToSchema))]
55#[serde(rename_all = "snake_case")]
56pub enum SessionTaskState {
57 Queued,
58 Running,
59 AwaitingInput,
60 Succeeded,
61 Failed,
62 Canceled,
63}
64
65impl SessionTaskState {
66 pub fn is_terminal(&self) -> bool {
67 matches!(self, Self::Succeeded | Self::Failed | Self::Canceled)
68 }
69
70 pub fn parse(s: &str) -> Option<Self> {
75 match s {
76 "queued" => Some(Self::Queued),
77 "running" => Some(Self::Running),
78 "awaiting_input" => Some(Self::AwaitingInput),
79 "succeeded" => Some(Self::Succeeded),
80 "failed" => Some(Self::Failed),
81 "canceled" => Some(Self::Canceled),
82 _ => None,
83 }
84 }
85}
86
87impl std::fmt::Display for SessionTaskState {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 let s = match self {
90 Self::Queued => "queued",
91 Self::Running => "running",
92 Self::AwaitingInput => "awaiting_input",
93 Self::Succeeded => "succeeded",
94 Self::Failed => "failed",
95 Self::Canceled => "canceled",
96 };
97 write!(f, "{s}")
98 }
99}
100
101impl From<&str> for SessionTaskState {
102 fn from(s: &str) -> Self {
103 match s {
104 "running" => Self::Running,
105 "awaiting_input" => Self::AwaitingInput,
106 "succeeded" => Self::Succeeded,
107 "failed" => Self::Failed,
108 "canceled" => Self::Canceled,
109 _ => Self::Queued,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
116#[cfg_attr(feature = "openapi", derive(ToSchema))]
117#[serde(rename_all = "snake_case")]
118pub enum TaskWakePolicy {
119 #[default]
121 Silent,
122 OnTerminal,
124 OnActivity,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
130#[cfg_attr(feature = "openapi", derive(ToSchema))]
131pub struct TaskInputRequest {
132 pub id: String,
134 pub prompt: String,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
138 #[cfg_attr(feature = "openapi", schema(value_type = Object))]
139 pub expected: Option<Value>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
144#[cfg_attr(feature = "openapi", derive(ToSchema))]
145pub struct TaskError {
146 pub kind: String,
147 pub message: String,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152#[cfg_attr(feature = "openapi", derive(ToSchema))]
153pub struct TaskArtifact {
154 pub name: String,
155 #[serde(rename = "type")]
157 pub artifact_type: String,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
160 pub path: Option<String>,
161 #[serde(default, skip_serializing_if = "Option::is_none")]
163 pub url: Option<String>,
164}
165
166#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
168#[cfg_attr(feature = "openapi", derive(ToSchema))]
169pub struct TaskLinks {
170 #[serde(default, skip_serializing_if = "Option::is_none")]
172 #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
173 pub child_session_id: Option<SessionId>,
174 #[serde(default, skip_serializing_if = "Option::is_none")]
176 pub remote_task_id: Option<String>,
177 #[serde(default, skip_serializing_if = "Vec::is_empty")]
179 pub resource_ids: Vec<String>,
180}
181
182impl TaskLinks {
183 pub fn is_empty(&self) -> bool {
184 self.child_session_id.is_none()
185 && self.remote_task_id.is_none()
186 && self.resource_ids.is_empty()
187 }
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192#[cfg_attr(feature = "openapi", derive(ToSchema))]
193pub struct SessionTask {
194 pub id: String,
196 #[cfg_attr(feature = "openapi", schema(value_type = String))]
198 pub session_id: SessionId,
199 pub kind: String,
201 pub display_name: String,
203 #[serde(default)]
205 #[cfg_attr(feature = "openapi", schema(value_type = Object))]
206 pub spec: Value,
207 pub state: SessionTaskState,
208 #[serde(default, skip_serializing_if = "Option::is_none")]
210 pub state_detail: Option<String>,
211 #[serde(default, skip_serializing_if = "Option::is_none")]
212 pub progress: Option<TaskProgress>,
213 #[serde(default, skip_serializing_if = "Option::is_none")]
215 pub input_request: Option<TaskInputRequest>,
216 #[serde(default, skip_serializing_if = "Option::is_none")]
218 pub cancel_requested_at: Option<DateTime<Utc>>,
219 #[serde(default, skip_serializing_if = "Option::is_none")]
221 pub summary: Option<String>,
222 #[serde(default, skip_serializing_if = "Option::is_none")]
224 pub result_path: Option<String>,
225 #[serde(default, skip_serializing_if = "Vec::is_empty")]
226 pub artifacts: Vec<TaskArtifact>,
227 #[serde(default, skip_serializing_if = "Option::is_none")]
228 pub error: Option<TaskError>,
229 #[serde(default = "default_attempt")]
231 pub attempt: i32,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
233 pub worker_id: Option<String>,
234 #[serde(default, skip_serializing_if = "Option::is_none")]
235 pub heartbeat_at: Option<DateTime<Utc>>,
236 #[serde(default, skip_serializing_if = "TaskLinks::is_empty")]
237 pub links: TaskLinks,
238 #[serde(default)]
239 pub wake_policy: TaskWakePolicy,
240 pub created_at: DateTime<Utc>,
241 #[serde(default, skip_serializing_if = "Option::is_none")]
242 pub started_at: Option<DateTime<Utc>>,
243 #[serde(default, skip_serializing_if = "Option::is_none")]
244 pub finished_at: Option<DateTime<Utc>>,
245 pub updated_at: DateTime<Utc>,
246}
247
248fn default_attempt() -> i32 {
249 1
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct CreateSessionTask {
255 pub session_id: SessionId,
256 #[serde(default)]
258 pub id: Option<String>,
259 pub kind: String,
260 pub display_name: String,
261 #[serde(default)]
262 pub spec: Value,
263 #[serde(default = "default_queued")]
265 pub state: SessionTaskState,
266 #[serde(default)]
267 pub links: TaskLinks,
268 #[serde(default)]
269 pub wake_policy: TaskWakePolicy,
270}
271
272fn default_queued() -> SessionTaskState {
273 SessionTaskState::Queued
274}
275
276#[derive(Debug, Clone, Default, Serialize, Deserialize)]
278pub struct SessionTaskUpdate {
279 pub state: Option<SessionTaskState>,
280 pub state_detail: Option<String>,
281 pub progress: Option<TaskProgress>,
282 pub input_request: Option<TaskInputRequest>,
284 pub summary: Option<String>,
285 pub result_path: Option<String>,
286 pub artifacts: Option<Vec<TaskArtifact>>,
288 pub error: Option<TaskError>,
289 pub links: Option<TaskLinks>,
291 pub worker_id: Option<String>,
292 pub heartbeat_at: Option<DateTime<Utc>>,
294 #[serde(default, skip_serializing_if = "Option::is_none")]
301 pub expected_attempt: Option<i32>,
302 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
307 pub increment_attempt: bool,
308}
309
310#[derive(Debug, Clone, Default)]
312pub struct SessionTaskFilter {
313 pub kind: Option<String>,
314 pub state: Option<SessionTaskState>,
315}
316
317pub fn apply_task_update(task: &mut SessionTask, update: SessionTaskUpdate, now: DateTime<Utc>) {
328 if let Some(expected) = update.expected_attempt
332 && expected != task.attempt
333 {
334 return;
335 }
336
337 let was_terminal = task.state.is_terminal();
338
339 if was_terminal
346 && let Some(state) = update.state
347 && state != task.state
348 {
349 return;
350 }
351
352 if update.increment_attempt {
355 task.attempt += 1;
356 }
357
358 let mut next_state = update.state;
359 if update.input_request.is_some() && !was_terminal {
360 next_state = Some(SessionTaskState::AwaitingInput);
361 }
362
363 if let Some(input_request) = update.input_request
364 && !was_terminal
365 {
366 task.input_request = Some(input_request);
367 }
368
369 if let Some(state) = next_state
370 && !was_terminal
371 && task.state != state
372 {
373 if task.state == SessionTaskState::Queued && state != SessionTaskState::Queued {
374 task.started_at.get_or_insert(now);
375 }
376 if state.is_terminal() {
377 task.finished_at.get_or_insert(now);
378 }
379 if state != SessionTaskState::AwaitingInput {
380 task.input_request = None;
381 }
382 task.state = state;
383 }
384
385 if let Some(detail) = update.state_detail {
386 task.state_detail = Some(detail);
387 }
388 if let Some(progress) = update.progress {
389 task.progress = Some(progress);
390 }
391 if let Some(summary) = update.summary {
392 task.summary = Some(summary);
393 }
394 if let Some(result_path) = update.result_path {
395 task.result_path = Some(result_path);
396 }
397 if let Some(artifacts) = update.artifacts {
398 task.artifacts = artifacts;
399 }
400 if let Some(error) = update.error {
401 task.error = Some(error);
402 }
403 if let Some(links) = update.links {
404 if links.child_session_id.is_some() {
405 task.links.child_session_id = links.child_session_id;
406 }
407 if links.remote_task_id.is_some() {
408 task.links.remote_task_id = links.remote_task_id;
409 }
410 for id in links.resource_ids {
411 if !task.links.resource_ids.contains(&id) {
412 task.links.resource_ids.push(id);
413 }
414 }
415 }
416 if let Some(worker_id) = update.worker_id {
417 task.worker_id = Some(worker_id);
418 }
419 if let Some(heartbeat_at) = update.heartbeat_at {
420 task.heartbeat_at = Some(heartbeat_at);
421 }
422
423 task.updated_at = now;
424}
425
426pub fn new_session_task(input: CreateSessionTask, now: DateTime<Utc>) -> SessionTask {
428 let state = input.state;
429 SessionTask {
430 id: input.id.unwrap_or_else(generate_task_id),
431 session_id: input.session_id,
432 kind: input.kind,
433 display_name: input.display_name,
434 spec: input.spec,
435 state,
436 state_detail: None,
437 progress: None,
438 input_request: None,
439 cancel_requested_at: None,
440 summary: None,
441 result_path: None,
442 artifacts: Vec::new(),
443 error: None,
444 attempt: 1,
445 worker_id: None,
446 heartbeat_at: None,
447 links: input.links,
448 wake_policy: input.wake_policy,
449 created_at: now,
450 started_at: if state == SessionTaskState::Queued {
451 None
452 } else {
453 Some(now)
454 },
455 finished_at: if state.is_terminal() { Some(now) } else { None },
456 updated_at: now,
457 }
458}
459
460#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
466#[cfg_attr(feature = "openapi", derive(ToSchema))]
467#[serde(rename_all = "snake_case")]
468pub enum TaskMessageDirection {
469 Inbound,
470 Outbound,
471}
472
473impl std::fmt::Display for TaskMessageDirection {
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 match self {
476 Self::Inbound => write!(f, "inbound"),
477 Self::Outbound => write!(f, "outbound"),
478 }
479 }
480}
481
482impl From<&str> for TaskMessageDirection {
483 fn from(s: &str) -> Self {
484 match s {
485 "outbound" => Self::Outbound,
486 _ => Self::Inbound,
487 }
488 }
489}
490
491#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
493#[cfg_attr(feature = "openapi", derive(ToSchema))]
494#[serde(tag = "type", rename_all = "snake_case")]
495pub enum TaskMessagePart {
496 Text {
497 text: String,
498 },
499 Data {
500 #[cfg_attr(feature = "openapi", schema(value_type = Object))]
501 data: Value,
502 },
503}
504
505impl TaskMessagePart {
506 pub fn text(text: impl Into<String>) -> Self {
507 Self::Text { text: text.into() }
508 }
509}
510
511#[derive(Debug, Clone, Serialize, Deserialize)]
513#[cfg_attr(feature = "openapi", derive(ToSchema))]
514pub struct TaskMessage {
515 pub id: String,
517 pub task_id: String,
518 pub direction: TaskMessageDirection,
519 pub content: Vec<TaskMessagePart>,
520 #[serde(default, skip_serializing_if = "Option::is_none")]
522 pub in_reply_to: Option<String>,
523 pub created_at: DateTime<Utc>,
524}
525
526#[derive(Debug, Clone, Serialize, Deserialize)]
528pub struct NewTaskMessage {
529 pub direction: TaskMessageDirection,
530 pub content: Vec<TaskMessagePart>,
531 #[serde(default)]
532 pub in_reply_to: Option<String>,
533 #[serde(default, skip_serializing_if = "Option::is_none")]
538 pub expected_attempt: Option<i32>,
539}
540
541impl NewTaskMessage {
542 pub fn inbound_text(text: impl Into<String>) -> Self {
543 Self {
544 direction: TaskMessageDirection::Inbound,
545 content: vec![TaskMessagePart::text(text)],
546 in_reply_to: None,
547 expected_attempt: None,
548 }
549 }
550
551 pub fn outbound_text(text: impl Into<String>) -> Self {
552 Self {
553 direction: TaskMessageDirection::Outbound,
554 content: vec![TaskMessagePart::text(text)],
555 in_reply_to: None,
556 expected_attempt: None,
557 }
558 }
559
560 pub fn with_expected_attempt(mut self, attempt: i32) -> Self {
562 self.expected_attempt = Some(attempt);
563 self
564 }
565}
566
567pub fn task_message_text(content: &[TaskMessagePart]) -> String {
569 content
570 .iter()
571 .filter_map(|part| match part {
572 TaskMessagePart::Text { text } => Some(text.as_str()),
573 TaskMessagePart::Data { .. } => None,
574 })
575 .collect::<Vec<_>>()
576 .join("\n")
577}
578
579#[async_trait]
587pub trait SessionTaskRegistry: Send + Sync {
588 async fn create(&self, input: CreateSessionTask) -> Result<SessionTask>;
591
592 async fn update(
594 &self,
595 session_id: SessionId,
596 task_id: &str,
597 update: SessionTaskUpdate,
598 ) -> Result<Option<SessionTask>>;
599
600 async fn get(&self, session_id: SessionId, task_id: &str) -> Result<Option<SessionTask>>;
601
602 async fn list(
603 &self,
604 session_id: SessionId,
605 filter: Option<&SessionTaskFilter>,
606 ) -> Result<Vec<SessionTask>>;
607
608 async fn request_cancel(
611 &self,
612 session_id: SessionId,
613 task_id: &str,
614 ) -> Result<Option<SessionTask>>;
615
616 async fn record_message(
620 &self,
621 session_id: SessionId,
622 task_id: &str,
623 message: NewTaskMessage,
624 ) -> Result<TaskMessage>;
625
626 async fn list_messages(
633 &self,
634 session_id: SessionId,
635 task_id: &str,
636 limit: Option<u32>,
637 after_id: Option<&str>,
638 ) -> Result<Vec<TaskMessage>>;
639}
640
641#[async_trait]
651pub trait TaskExecutor: Send + Sync {
652 fn kind(&self) -> &str;
653
654 fn can_reattach(&self) -> bool {
662 false
663 }
664
665 fn can_reattach_task(&self, task: &SessionTask) -> bool {
672 let _ = task;
673 self.can_reattach()
674 }
675
676 async fn start(&self, task: &SessionTask, context: &crate::traits::ToolContext) -> Result<()> {
682 let _ = (task, context);
683 Err(crate::error::AgentLoopError::tool(format!(
684 "task kind '{}' does not support start via the registry",
685 self.kind()
686 )))
687 }
688
689 async fn deliver(
691 &self,
692 task: &SessionTask,
693 message: &TaskMessage,
694 context: &crate::traits::ToolContext,
695 ) -> Result<()> {
696 let _ = (task, message, context);
697 Err(crate::error::AgentLoopError::tool(format!(
698 "task kind '{}' does not accept inbound messages",
699 self.kind()
700 )))
701 }
702
703 async fn cancel(&self, task: &SessionTask, context: &crate::traits::ToolContext) -> Result<()>;
705
706 async fn reconcile(
709 &self,
710 task: &SessionTask,
711 context: &crate::traits::ToolContext,
712 ) -> Result<()> {
713 let _ = (task, context);
714 Ok(())
715 }
716}
717
718pub struct TaskExecutorPlugin {
721 pub executor: fn() -> Arc<dyn TaskExecutor>,
722}
723
724inventory::collect!(TaskExecutorPlugin);
725
726pub fn find_task_executor(kind: &str) -> Option<Arc<dyn TaskExecutor>> {
728 inventory::iter::<TaskExecutorPlugin>
729 .into_iter()
730 .map(|plugin| (plugin.executor)())
731 .find(|executor| executor.kind() == kind)
732}
733
734#[async_trait]
742pub trait TaskSink: Send + Sync {
743 async fn state(&self, state: SessionTaskState, detail: Option<String>) -> Result<()>;
744
745 async fn progress(&self, progress: TaskProgress) -> Result<()>;
746
747 async fn output(&self, stream: &str, delta: &str) -> Result<()>;
749
750 async fn post(&self, message: NewTaskMessage) -> Result<()>;
752
753 async fn request_input(&self, request: TaskInputRequest) -> Result<()>;
755
756 async fn artifact(&self, artifact: TaskArtifact) -> Result<()>;
757}
758
759pub struct RegistryTaskSink {
766 registry: Arc<dyn SessionTaskRegistry>,
767 session_id: SessionId,
768 task_id: String,
769 attempt: i32,
771}
772
773impl RegistryTaskSink {
774 pub fn new(
775 registry: Arc<dyn SessionTaskRegistry>,
776 session_id: SessionId,
777 task_id: String,
778 ) -> Self {
779 Self {
780 registry,
781 session_id,
782 task_id,
783 attempt: 1,
784 }
785 }
786
787 pub fn with_attempt(mut self, attempt: i32) -> Self {
790 self.attempt = attempt;
791 self
792 }
793}
794
795#[async_trait]
796impl TaskSink for RegistryTaskSink {
797 async fn state(&self, state: SessionTaskState, detail: Option<String>) -> Result<()> {
798 self.registry
799 .update(
800 self.session_id,
801 &self.task_id,
802 SessionTaskUpdate {
803 state: Some(state),
804 state_detail: detail,
805 expected_attempt: Some(self.attempt),
806 ..Default::default()
807 },
808 )
809 .await?;
810 Ok(())
811 }
812
813 async fn progress(&self, progress: TaskProgress) -> Result<()> {
814 self.registry
815 .update(
816 self.session_id,
817 &self.task_id,
818 SessionTaskUpdate {
819 progress: Some(progress),
820 expected_attempt: Some(self.attempt),
821 ..Default::default()
822 },
823 )
824 .await?;
825 Ok(())
826 }
827
828 async fn output(&self, _stream: &str, _delta: &str) -> Result<()> {
829 Ok(())
830 }
831
832 async fn post(&self, message: NewTaskMessage) -> Result<()> {
833 self.registry
836 .record_message(
837 self.session_id,
838 &self.task_id,
839 message.with_expected_attempt(self.attempt),
840 )
841 .await?;
842 Ok(())
843 }
844
845 async fn request_input(&self, request: TaskInputRequest) -> Result<()> {
846 self.registry
847 .update(
848 self.session_id,
849 &self.task_id,
850 SessionTaskUpdate {
851 input_request: Some(request),
852 expected_attempt: Some(self.attempt),
853 ..Default::default()
854 },
855 )
856 .await?;
857 Ok(())
858 }
859
860 async fn artifact(&self, artifact: TaskArtifact) -> Result<()> {
861 let Some(task) = self.registry.get(self.session_id, &self.task_id).await? else {
862 return Ok(());
863 };
864 if task.attempt != self.attempt {
866 return Ok(());
867 }
868 let mut artifacts = task.artifacts;
869 artifacts.push(artifact);
870 self.registry
871 .update(
872 self.session_id,
873 &self.task_id,
874 SessionTaskUpdate {
875 artifacts: Some(artifacts),
876 expected_attempt: Some(self.attempt),
877 ..Default::default()
878 },
879 )
880 .await?;
881 Ok(())
882 }
883}
884
885pub fn task_vfs_dir(task_id: &str) -> String {
887 format!("/.tasks/{task_id}")
888}
889
890pub fn task_result_path(task_id: &str) -> String {
892 format!("/.tasks/{task_id}/result.json")
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898
899 fn task() -> SessionTask {
900 new_session_task(
901 CreateSessionTask {
902 session_id: SessionId::new(),
903 id: None,
904 kind: TASK_KIND_BACKGROUND_TOOL.to_string(),
905 display_name: "Test".to_string(),
906 spec: serde_json::json!({}),
907 state: SessionTaskState::Queued,
908 links: TaskLinks::default(),
909 wake_policy: TaskWakePolicy::Silent,
910 },
911 Utc::now(),
912 )
913 }
914
915 #[test]
916 fn create_generates_prefixed_id() {
917 let t = task();
918 assert!(t.id.starts_with("task_"));
919 assert_eq!(t.state, SessionTaskState::Queued);
920 assert!(t.started_at.is_none());
921 }
922
923 #[test]
924 fn first_transition_out_of_queued_stamps_started_at() {
925 let mut t = task();
926 let now = Utc::now();
927 apply_task_update(
928 &mut t,
929 SessionTaskUpdate {
930 state: Some(SessionTaskState::Running),
931 ..Default::default()
932 },
933 now,
934 );
935 assert_eq!(t.state, SessionTaskState::Running);
936 assert_eq!(t.started_at, Some(now));
937 assert!(t.finished_at.is_none());
938 }
939
940 #[test]
941 fn terminal_transition_stamps_finished_at_and_is_final() {
942 let mut t = task();
943 let now = Utc::now();
944 apply_task_update(
945 &mut t,
946 SessionTaskUpdate {
947 state: Some(SessionTaskState::Succeeded),
948 summary: Some("done".to_string()),
949 ..Default::default()
950 },
951 now,
952 );
953 assert_eq!(t.state, SessionTaskState::Succeeded);
954 assert_eq!(t.finished_at, Some(now));
955
956 apply_task_update(
961 &mut t,
962 SessionTaskUpdate {
963 state: Some(SessionTaskState::Failed),
964 error: Some(TaskError {
965 kind: "orphaned".to_string(),
966 message: "stale".to_string(),
967 }),
968 ..Default::default()
969 },
970 Utc::now(),
971 );
972 assert_eq!(t.state, SessionTaskState::Succeeded);
973 assert!(t.error.is_none());
974
975 apply_task_update(
977 &mut t,
978 SessionTaskUpdate {
979 state: Some(SessionTaskState::Succeeded),
980 result_path: Some("/.tasks/x/result.json".to_string()),
981 ..Default::default()
982 },
983 Utc::now(),
984 );
985 assert_eq!(t.result_path.as_deref(), Some("/.tasks/x/result.json"));
986
987 apply_task_update(
989 &mut t,
990 SessionTaskUpdate {
991 summary: Some("enriched".to_string()),
992 ..Default::default()
993 },
994 Utc::now(),
995 );
996 assert_eq!(t.summary.as_deref(), Some("enriched"));
997 }
998
999 #[test]
1000 fn input_request_forces_awaiting_input_and_clears_on_resume() {
1001 let mut t = task();
1002 apply_task_update(
1003 &mut t,
1004 SessionTaskUpdate {
1005 input_request: Some(TaskInputRequest {
1006 id: "req_1".to_string(),
1007 prompt: "Approve?".to_string(),
1008 expected: None,
1009 }),
1010 ..Default::default()
1011 },
1012 Utc::now(),
1013 );
1014 assert_eq!(t.state, SessionTaskState::AwaitingInput);
1015 assert!(t.input_request.is_some());
1016
1017 apply_task_update(
1018 &mut t,
1019 SessionTaskUpdate {
1020 state: Some(SessionTaskState::Running),
1021 ..Default::default()
1022 },
1023 Utc::now(),
1024 );
1025 assert_eq!(t.state, SessionTaskState::Running);
1026 assert!(t.input_request.is_none());
1027 }
1028
1029 #[test]
1030 fn links_merge_without_duplicates() {
1031 let mut t = task();
1032 let child = SessionId::new();
1033 apply_task_update(
1034 &mut t,
1035 SessionTaskUpdate {
1036 links: Some(TaskLinks {
1037 child_session_id: Some(child),
1038 remote_task_id: None,
1039 resource_ids: vec!["res_1".to_string()],
1040 }),
1041 ..Default::default()
1042 },
1043 Utc::now(),
1044 );
1045 apply_task_update(
1046 &mut t,
1047 SessionTaskUpdate {
1048 links: Some(TaskLinks {
1049 child_session_id: None,
1050 remote_task_id: Some("rt_1".to_string()),
1051 resource_ids: vec!["res_1".to_string(), "res_2".to_string()],
1052 }),
1053 ..Default::default()
1054 },
1055 Utc::now(),
1056 );
1057 assert_eq!(t.links.child_session_id, Some(child));
1058 assert_eq!(t.links.remote_task_id.as_deref(), Some("rt_1"));
1059 assert_eq!(t.links.resource_ids, vec!["res_1", "res_2"]);
1060 }
1061
1062 #[test]
1063 fn message_text_rendering() {
1064 let msg = NewTaskMessage::outbound_text("hello");
1065 assert_eq!(task_message_text(&msg.content), "hello");
1066 }
1067
1068 #[test]
1073 fn update_with_matching_attempt_applies() {
1074 let mut t = task();
1075 assert_eq!(t.attempt, 1);
1077 let now = Utc::now();
1078
1079 apply_task_update(
1081 &mut t,
1082 SessionTaskUpdate {
1083 state: Some(SessionTaskState::Running),
1084 state_detail: Some("step 1".to_string()),
1085 heartbeat_at: Some(now),
1086 expected_attempt: Some(1),
1087 ..Default::default()
1088 },
1089 now,
1090 );
1091 assert_eq!(t.state, SessionTaskState::Running);
1092 assert_eq!(t.state_detail.as_deref(), Some("step 1"));
1093 assert_eq!(t.heartbeat_at, Some(now));
1094 }
1095
1096 #[test]
1097 fn update_with_stale_attempt_is_fully_ignored() {
1098 let mut t = task();
1099 t.attempt = 2;
1101
1102 let now = Utc::now();
1103 let before_updated = t.updated_at;
1104
1105 apply_task_update(
1108 &mut t,
1109 SessionTaskUpdate {
1110 state: Some(SessionTaskState::Running),
1111 state_detail: Some("superseded".to_string()),
1112 heartbeat_at: Some(now),
1113 expected_attempt: Some(1),
1114 ..Default::default()
1115 },
1116 now,
1117 );
1118 assert_eq!(t.state, SessionTaskState::Queued, "state must be unchanged");
1119 assert!(t.state_detail.is_none(), "state_detail must be unchanged");
1120 assert!(t.heartbeat_at.is_none(), "heartbeat must be unchanged");
1121 assert_eq!(t.updated_at, before_updated, "updated_at must be unchanged");
1122 }
1123
1124 #[test]
1125 fn update_with_none_expected_attempt_applies_regardless() {
1126 let mut t = task();
1127 t.attempt = 99;
1129 let now = Utc::now();
1130
1131 apply_task_update(
1132 &mut t,
1133 SessionTaskUpdate {
1134 summary: Some("cancel from API".to_string()),
1135 expected_attempt: None,
1136 ..Default::default()
1137 },
1138 now,
1139 );
1140 assert_eq!(t.summary.as_deref(), Some("cancel from API"));
1142 }
1143
1144 #[test]
1145 fn reaper_update_increments_attempt_and_fences_old_executor() {
1146 let mut t = task();
1147 t.state = SessionTaskState::Running;
1148 assert_eq!(t.attempt, 1);
1149 let now = Utc::now();
1150
1151 apply_task_update(
1153 &mut t,
1154 SessionTaskUpdate {
1155 state: Some(SessionTaskState::Failed),
1156 error: Some(TaskError {
1157 kind: "orphaned".to_string(),
1158 message: "worker heartbeat stopped".to_string(),
1159 }),
1160 increment_attempt: true,
1161 ..Default::default()
1162 },
1163 now,
1164 );
1165 assert_eq!(t.state, SessionTaskState::Failed);
1166 assert_eq!(t.attempt, 2, "orphan reap must supersede the attempt");
1167
1168 let later = now + chrono::Duration::seconds(5);
1171 apply_task_update(
1172 &mut t,
1173 SessionTaskUpdate {
1174 heartbeat_at: Some(later),
1175 expected_attempt: Some(1),
1176 ..Default::default()
1177 },
1178 later,
1179 );
1180 assert_ne!(
1181 t.heartbeat_at,
1182 Some(later),
1183 "stale heartbeat must be rejected"
1184 );
1185 }
1186
1187 #[test]
1188 fn increment_attempt_is_inert_when_update_is_dropped() {
1189 let mut t = task();
1190 t.state = SessionTaskState::Succeeded;
1191 assert_eq!(t.attempt, 1);
1192 let now = Utc::now();
1193
1194 apply_task_update(
1197 &mut t,
1198 SessionTaskUpdate {
1199 state: Some(SessionTaskState::Failed),
1200 error: Some(TaskError {
1201 kind: "orphaned".to_string(),
1202 message: "worker heartbeat stopped".to_string(),
1203 }),
1204 increment_attempt: true,
1205 ..Default::default()
1206 },
1207 now,
1208 );
1209 assert_eq!(t.state, SessionTaskState::Succeeded);
1210 assert_eq!(t.attempt, 1, "dropped update must not bump the attempt");
1211 }
1212}