Skip to main content

everruns_core/
session_task.rs

1// Session tasks — unified registry of background work owned by a session.
2//
3// See specs/session-tasks.md. A task is any asynchronous work a session owns
4// (subagent, external A2A agent, background tool, monitor). The registry owns
5// the record, lifecycle invariants, and task.* events; capabilities plug in
6// `TaskExecutor`s (control plane) and report through `TaskSink` (report plane).
7//
8// Decision: lifecycle invariants live in `apply_task_update` so every backend
9// (PostgreSQL, in-memory, gRPC) applies identical semantics.
10// Decision: kind is a free-form string for extensibility — no enum.
11// Decision: cancellation is cooperative — `request_cancel` records intent via
12// `cancel_requested_at`; executors wind down and report the terminal state.
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::sync::Arc;
19
20use crate::error::Result;
21use crate::typed_id::SessionId;
22
23#[cfg(feature = "openapi")]
24use utoipa::ToSchema;
25
26/// Progress shape shared with background tool execution.
27pub type TaskProgress = crate::background::BackgroundProgress;
28
29/// Well-known task kinds. Kind stays a free-form string; these constants
30/// cover the built-in executors.
31pub const TASK_KIND_SUBAGENT: &str = "subagent";
32pub const TASK_KIND_EXTERNAL_AGENT: &str = "external_agent";
33pub const TASK_KIND_BACKGROUND_TOOL: &str = "background_tool";
34/// Long-lived monitor task linked to a session schedule. Stays `running`
35/// until the linked schedule is exhausted (one-shot) or `cancel_task` is called.
36pub const TASK_KIND_MONITOR: &str = "monitor";
37
38/// Generate a new task ID (`task_` prefix per specs/id-schema.md).
39pub fn generate_task_id() -> String {
40    format!("task_{}", uuid::Uuid::now_v7().simple())
41}
42
43/// Generate a new task message ID.
44pub fn generate_task_message_id() -> String {
45    format!("tmsg_{}", uuid::Uuid::now_v7().simple())
46}
47
48/// Lifecycle state of a session task.
49///
50/// Three classes: active (`queued`, `running`), interrupted (`awaiting_input`,
51/// resumable), terminal (`succeeded`, `failed`, `canceled`). Timeout and
52/// rejection are `error.kind` values on `failed`, not states.
53#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
54#[cfg_attr(feature = "openapi", derive(ToSchema))]
55#[serde(rename_all = "snake_case")]
56pub enum SessionTaskState {
57    Queued,
58    Running,
59    AwaitingInput,
60    Succeeded,
61    Failed,
62    Canceled,
63}
64
65impl SessionTaskState {
66    pub fn is_terminal(&self) -> bool {
67        matches!(self, Self::Succeeded | Self::Failed | Self::Canceled)
68    }
69
70    /// Strict parser for caller-supplied state strings (API filters, tool
71    /// arguments). Unlike `From<&str>` — which exists for trusted,
72    /// CHECK-constrained storage values and defaults to `Queued` — this
73    /// returns None for unknown input so callers can reject it.
74    pub fn parse(s: &str) -> Option<Self> {
75        match s {
76            "queued" => Some(Self::Queued),
77            "running" => Some(Self::Running),
78            "awaiting_input" => Some(Self::AwaitingInput),
79            "succeeded" => Some(Self::Succeeded),
80            "failed" => Some(Self::Failed),
81            "canceled" => Some(Self::Canceled),
82            _ => None,
83        }
84    }
85}
86
87impl std::fmt::Display for SessionTaskState {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        let s = match self {
90            Self::Queued => "queued",
91            Self::Running => "running",
92            Self::AwaitingInput => "awaiting_input",
93            Self::Succeeded => "succeeded",
94            Self::Failed => "failed",
95            Self::Canceled => "canceled",
96        };
97        write!(f, "{s}")
98    }
99}
100
101impl From<&str> for SessionTaskState {
102    fn from(s: &str) -> Self {
103        match s {
104            "running" => Self::Running,
105            "awaiting_input" => Self::AwaitingInput,
106            "succeeded" => Self::Succeeded,
107            "failed" => Self::Failed,
108            "canceled" => Self::Canceled,
109            _ => Self::Queued,
110        }
111    }
112}
113
114/// When outbound task activity wakes the owning session's agent.
115#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
116#[cfg_attr(feature = "openapi", derive(ToSchema))]
117#[serde(rename_all = "snake_case")]
118pub enum TaskWakePolicy {
119    /// Never wake; the agent polls via `get_task`/`list_tasks`.
120    #[default]
121    Silent,
122    /// Wake on transition to a terminal state.
123    OnTerminal,
124    /// Wake on any outbound message or input request, and on terminal states.
125    OnActivity,
126}
127
128/// Structured ask posted by a task that needs input to continue.
129#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
130#[cfg_attr(feature = "openapi", derive(ToSchema))]
131pub struct TaskInputRequest {
132    /// Stable ID referenced by the answering message's `in_reply_to`.
133    pub id: String,
134    /// Human/agent-readable prompt.
135    pub prompt: String,
136    /// Optional machine-readable description of the expected answer.
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    #[cfg_attr(feature = "openapi", schema(value_type = Object))]
139    pub expected: Option<Value>,
140}
141
142/// Terminal error detail. Timeout/rejection/orphaned are kinds, not states.
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
144#[cfg_attr(feature = "openapi", derive(ToSchema))]
145pub struct TaskError {
146    pub kind: String,
147    pub message: String,
148}
149
150/// Typed link to something the task produced.
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152#[cfg_attr(feature = "openapi", derive(ToSchema))]
153pub struct TaskArtifact {
154    pub name: String,
155    /// Artifact type: "file", "url", "session", "pr", etc.
156    #[serde(rename = "type")]
157    pub artifact_type: String,
158    /// Session VFS path, when the artifact lives in the session filesystem.
159    #[serde(default, skip_serializing_if = "Option::is_none")]
160    pub path: Option<String>,
161    /// External URL, when the artifact lives elsewhere.
162    #[serde(default, skip_serializing_if = "Option::is_none")]
163    pub url: Option<String>,
164}
165
166/// Cross-references owned by a task.
167#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
168#[cfg_attr(feature = "openapi", derive(ToSchema))]
169pub struct TaskLinks {
170    /// Child session, for subagent-shaped tasks. Full transcript lives there.
171    #[serde(default, skip_serializing_if = "Option::is_none")]
172    #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
173    pub child_session_id: Option<SessionId>,
174    /// Remote task ID, for tasks wrapping an external protocol task (A2A).
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub remote_task_id: Option<String>,
177    /// Session resources (sandboxes, browser sessions) this task holds.
178    #[serde(default, skip_serializing_if = "Vec::is_empty")]
179    pub resource_ids: Vec<String>,
180}
181
182impl TaskLinks {
183    pub fn is_empty(&self) -> bool {
184        self.child_session_id.is_none()
185            && self.remote_task_id.is_none()
186            && self.resource_ids.is_empty()
187    }
188}
189
190/// A unit of background work owned by a session.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192#[cfg_attr(feature = "openapi", derive(ToSchema))]
193pub struct SessionTask {
194    /// `task_*` public ID.
195    pub id: String,
196    /// Owning session.
197    #[cfg_attr(feature = "openapi", schema(value_type = String))]
198    pub session_id: SessionId,
199    /// Task kind: "subagent", "external_agent", "background_tool", "monitor", …
200    pub kind: String,
201    /// Human-readable label.
202    pub display_name: String,
203    /// Kind-specific input (instructions, tool args, external agent id).
204    #[serde(default)]
205    #[cfg_attr(feature = "openapi", schema(value_type = Object))]
206    pub spec: Value,
207    pub state: SessionTaskState,
208    /// Short live status line ("polling remote task", "iteration 4/10").
209    #[serde(default, skip_serializing_if = "Option::is_none")]
210    pub state_detail: Option<String>,
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub progress: Option<TaskProgress>,
213    /// Pending ask while `awaiting_input`; cleared when answered.
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub input_request: Option<TaskInputRequest>,
216    /// Cooperative cancel intent. A flag, not a state.
217    #[serde(default, skip_serializing_if = "Option::is_none")]
218    pub cancel_requested_at: Option<DateTime<Utc>>,
219    /// Human-readable outcome.
220    #[serde(default, skip_serializing_if = "Option::is_none")]
221    pub summary: Option<String>,
222    /// Machine result in the session VFS: `/.tasks/{task_id}/result.json`.
223    #[serde(default, skip_serializing_if = "Option::is_none")]
224    pub result_path: Option<String>,
225    #[serde(default, skip_serializing_if = "Vec::is_empty")]
226    pub artifacts: Vec<TaskArtifact>,
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    pub error: Option<TaskError>,
229    /// Execution attempt, starting at 1. Incremented on re-attach.
230    #[serde(default = "default_attempt")]
231    pub attempt: i32,
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub worker_id: Option<String>,
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub heartbeat_at: Option<DateTime<Utc>>,
236    #[serde(default, skip_serializing_if = "TaskLinks::is_empty")]
237    pub links: TaskLinks,
238    #[serde(default)]
239    pub wake_policy: TaskWakePolicy,
240    pub created_at: DateTime<Utc>,
241    #[serde(default, skip_serializing_if = "Option::is_none")]
242    pub started_at: Option<DateTime<Utc>>,
243    #[serde(default, skip_serializing_if = "Option::is_none")]
244    pub finished_at: Option<DateTime<Utc>>,
245    pub updated_at: DateTime<Utc>,
246}
247
248fn default_attempt() -> i32 {
249    1
250}
251
252/// Input for creating a task.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct CreateSessionTask {
255    pub session_id: SessionId,
256    /// Caller-supplied ID for idempotent creation; generated when None.
257    #[serde(default)]
258    pub id: Option<String>,
259    pub kind: String,
260    pub display_name: String,
261    #[serde(default)]
262    pub spec: Value,
263    /// Initial state; defaults to Queued.
264    #[serde(default = "default_queued")]
265    pub state: SessionTaskState,
266    #[serde(default)]
267    pub links: TaskLinks,
268    #[serde(default)]
269    pub wake_policy: TaskWakePolicy,
270}
271
272fn default_queued() -> SessionTaskState {
273    SessionTaskState::Queued
274}
275
276/// Partial update applied through `apply_task_update`. None = unchanged.
277#[derive(Debug, Clone, Default, Serialize, Deserialize)]
278pub struct SessionTaskUpdate {
279    pub state: Option<SessionTaskState>,
280    pub state_detail: Option<String>,
281    pub progress: Option<TaskProgress>,
282    /// Setting an input request implies `awaiting_input`.
283    pub input_request: Option<TaskInputRequest>,
284    pub summary: Option<String>,
285    pub result_path: Option<String>,
286    /// Replaces the artifact list when set.
287    pub artifacts: Option<Vec<TaskArtifact>>,
288    pub error: Option<TaskError>,
289    /// Merged field-by-field into existing links.
290    pub links: Option<TaskLinks>,
291    pub worker_id: Option<String>,
292    /// Liveness heartbeat timestamp.
293    pub heartbeat_at: Option<DateTime<Utc>>,
294}
295
296/// Optional filter for listing tasks.
297#[derive(Debug, Clone, Default)]
298pub struct SessionTaskFilter {
299    pub kind: Option<String>,
300    pub state: Option<SessionTaskState>,
301}
302
303/// Apply a partial update to a task, enforcing lifecycle invariants.
304///
305/// All registry backends route updates through this function so semantics
306/// stay identical across PostgreSQL, in-memory, and gRPC modes:
307/// - terminal states are final: state changes on a terminal task are ignored
308///   (content fields like summary/result still apply);
309/// - first transition out of `queued` stamps `started_at`;
310/// - transition into a terminal state stamps `finished_at`;
311/// - setting `input_request` forces `awaiting_input`; leaving
312///   `awaiting_input` clears it.
313pub fn apply_task_update(task: &mut SessionTask, update: SessionTaskUpdate, now: DateTime<Utc>) {
314    let was_terminal = task.state.is_terminal();
315
316    // Terminal states are final. An update that asks for a *different* state
317    // on an already-terminal task lost a race (e.g. the reaper marking a task
318    // orphaned after it succeeded) — ignore it entirely so its content fields
319    // (error, summary) cannot corrupt the terminal record. Updates that carry
320    // the same terminal state (idempotent re-mirrors) or no state at all
321    // (content enrichment) still apply below.
322    if was_terminal
323        && let Some(state) = update.state
324        && state != task.state
325    {
326        return;
327    }
328
329    let mut next_state = update.state;
330    if update.input_request.is_some() && !was_terminal {
331        next_state = Some(SessionTaskState::AwaitingInput);
332    }
333
334    if let Some(input_request) = update.input_request
335        && !was_terminal
336    {
337        task.input_request = Some(input_request);
338    }
339
340    if let Some(state) = next_state
341        && !was_terminal
342        && task.state != state
343    {
344        if task.state == SessionTaskState::Queued && state != SessionTaskState::Queued {
345            task.started_at.get_or_insert(now);
346        }
347        if state.is_terminal() {
348            task.finished_at.get_or_insert(now);
349        }
350        if state != SessionTaskState::AwaitingInput {
351            task.input_request = None;
352        }
353        task.state = state;
354    }
355
356    if let Some(detail) = update.state_detail {
357        task.state_detail = Some(detail);
358    }
359    if let Some(progress) = update.progress {
360        task.progress = Some(progress);
361    }
362    if let Some(summary) = update.summary {
363        task.summary = Some(summary);
364    }
365    if let Some(result_path) = update.result_path {
366        task.result_path = Some(result_path);
367    }
368    if let Some(artifacts) = update.artifacts {
369        task.artifacts = artifacts;
370    }
371    if let Some(error) = update.error {
372        task.error = Some(error);
373    }
374    if let Some(links) = update.links {
375        if links.child_session_id.is_some() {
376            task.links.child_session_id = links.child_session_id;
377        }
378        if links.remote_task_id.is_some() {
379            task.links.remote_task_id = links.remote_task_id;
380        }
381        for id in links.resource_ids {
382            if !task.links.resource_ids.contains(&id) {
383                task.links.resource_ids.push(id);
384            }
385        }
386    }
387    if let Some(worker_id) = update.worker_id {
388        task.worker_id = Some(worker_id);
389    }
390    if let Some(heartbeat_at) = update.heartbeat_at {
391        task.heartbeat_at = Some(heartbeat_at);
392    }
393
394    task.updated_at = now;
395}
396
397/// Build a new task from creation input.
398pub fn new_session_task(input: CreateSessionTask, now: DateTime<Utc>) -> SessionTask {
399    let state = input.state;
400    SessionTask {
401        id: input.id.unwrap_or_else(generate_task_id),
402        session_id: input.session_id,
403        kind: input.kind,
404        display_name: input.display_name,
405        spec: input.spec,
406        state,
407        state_detail: None,
408        progress: None,
409        input_request: None,
410        cancel_requested_at: None,
411        summary: None,
412        result_path: None,
413        artifacts: Vec::new(),
414        error: None,
415        attempt: 1,
416        worker_id: None,
417        heartbeat_at: None,
418        links: input.links,
419        wake_policy: input.wake_policy,
420        created_at: now,
421        started_at: if state == SessionTaskState::Queued {
422            None
423        } else {
424            Some(now)
425        },
426        finished_at: if state.is_terminal() { Some(now) } else { None },
427        updated_at: now,
428    }
429}
430
431// ============================================================================
432// Messages — bidirectional, persisted channel between session and task
433// ============================================================================
434
435/// Direction of a task message. Inbound = session → task.
436#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
437#[cfg_attr(feature = "openapi", derive(ToSchema))]
438#[serde(rename_all = "snake_case")]
439pub enum TaskMessageDirection {
440    Inbound,
441    Outbound,
442}
443
444impl std::fmt::Display for TaskMessageDirection {
445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446        match self {
447            Self::Inbound => write!(f, "inbound"),
448            Self::Outbound => write!(f, "outbound"),
449        }
450    }
451}
452
453impl From<&str> for TaskMessageDirection {
454    fn from(s: &str) -> Self {
455        match s {
456            "outbound" => Self::Outbound,
457            _ => Self::Inbound,
458        }
459    }
460}
461
462/// One content part of a task message.
463#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
464#[cfg_attr(feature = "openapi", derive(ToSchema))]
465#[serde(tag = "type", rename_all = "snake_case")]
466pub enum TaskMessagePart {
467    Text {
468        text: String,
469    },
470    Data {
471        #[cfg_attr(feature = "openapi", schema(value_type = Object))]
472        data: Value,
473    },
474}
475
476impl TaskMessagePart {
477    pub fn text(text: impl Into<String>) -> Self {
478        Self::Text { text: text.into() }
479    }
480}
481
482/// A message exchanged between a session and one of its tasks.
483#[derive(Debug, Clone, Serialize, Deserialize)]
484#[cfg_attr(feature = "openapi", derive(ToSchema))]
485pub struct TaskMessage {
486    /// `tmsg_*` public ID.
487    pub id: String,
488    pub task_id: String,
489    pub direction: TaskMessageDirection,
490    pub content: Vec<TaskMessagePart>,
491    /// Set when this message answers a `TaskInputRequest`.
492    #[serde(default, skip_serializing_if = "Option::is_none")]
493    pub in_reply_to: Option<String>,
494    pub created_at: DateTime<Utc>,
495}
496
497/// Input for recording a task message.
498#[derive(Debug, Clone, Serialize, Deserialize)]
499pub struct NewTaskMessage {
500    pub direction: TaskMessageDirection,
501    pub content: Vec<TaskMessagePart>,
502    #[serde(default)]
503    pub in_reply_to: Option<String>,
504}
505
506impl NewTaskMessage {
507    pub fn inbound_text(text: impl Into<String>) -> Self {
508        Self {
509            direction: TaskMessageDirection::Inbound,
510            content: vec![TaskMessagePart::text(text)],
511            in_reply_to: None,
512        }
513    }
514
515    pub fn outbound_text(text: impl Into<String>) -> Self {
516        Self {
517            direction: TaskMessageDirection::Outbound,
518            content: vec![TaskMessagePart::text(text)],
519            in_reply_to: None,
520        }
521    }
522}
523
524/// Plain-text rendering of message content (for steering/wake messages).
525pub fn task_message_text(content: &[TaskMessagePart]) -> String {
526    content
527        .iter()
528        .filter_map(|part| match part {
529            TaskMessagePart::Text { text } => Some(text.as_str()),
530            TaskMessagePart::Data { .. } => None,
531        })
532        .collect::<Vec<_>>()
533        .join("\n")
534}
535
536// ============================================================================
537// Registry — owns the record, invariants, events, and durability
538// ============================================================================
539
540/// Session task registry. Implementations emit `task.created` /
541/// `task.updated` (full snapshots) and `task.message.*` events on the owning
542/// session's event stream.
543#[async_trait]
544pub trait SessionTaskRegistry: Send + Sync {
545    /// Create a task (idempotent on caller-supplied ID: re-creating an
546    /// existing ID returns the stored task unchanged).
547    async fn create(&self, input: CreateSessionTask) -> Result<SessionTask>;
548
549    /// Apply a partial update through `apply_task_update` invariants.
550    async fn update(
551        &self,
552        session_id: SessionId,
553        task_id: &str,
554        update: SessionTaskUpdate,
555    ) -> Result<Option<SessionTask>>;
556
557    async fn get(&self, session_id: SessionId, task_id: &str) -> Result<Option<SessionTask>>;
558
559    async fn list(
560        &self,
561        session_id: SessionId,
562        filter: Option<&SessionTaskFilter>,
563    ) -> Result<Vec<SessionTask>>;
564
565    /// Record cooperative cancel intent (idempotent). Does not change state;
566    /// the executor winds down and reports the terminal state.
567    async fn request_cancel(
568        &self,
569        session_id: SessionId,
570        task_id: &str,
571    ) -> Result<Option<SessionTask>>;
572
573    /// Persist a message on the task's channel. Answering messages
574    /// (`in_reply_to` set) clear a matching pending input request and return
575    /// the task to `running`.
576    async fn record_message(
577        &self,
578        session_id: SessionId,
579        task_id: &str,
580        message: NewTaskMessage,
581    ) -> Result<TaskMessage>;
582
583    /// List messages on the task's channel, oldest first.
584    async fn list_messages(
585        &self,
586        session_id: SessionId,
587        task_id: &str,
588        limit: Option<u32>,
589    ) -> Result<Vec<TaskMessage>>;
590}
591
592// ============================================================================
593// Executor — control plane, implemented per kind by capabilities
594// ============================================================================
595
596/// Control plane for a task kind. The registry/tools call into the executor;
597/// the running work pushes into a `TaskSink`.
598///
599/// Default method bodies return `unsupported` so kinds implement only what
600/// applies (e.g. a background tool rarely accepts inbound messages).
601#[async_trait]
602pub trait TaskExecutor: Send + Sync {
603    fn kind(&self) -> &str;
604
605    /// Begin execution, or re-attach after worker loss.
606    async fn start(&self, task: &SessionTask, context: &crate::traits::ToolContext) -> Result<()> {
607        let _ = (task, context);
608        Err(crate::error::AgentLoopError::tool(format!(
609            "task kind '{}' does not support start via the registry",
610            self.kind()
611        )))
612    }
613
614    /// Deliver an inbound message (steering or input answer) to the work.
615    async fn deliver(
616        &self,
617        task: &SessionTask,
618        message: &TaskMessage,
619        context: &crate::traits::ToolContext,
620    ) -> Result<()> {
621        let _ = (task, message, context);
622        Err(crate::error::AgentLoopError::tool(format!(
623            "task kind '{}' does not accept inbound messages",
624            self.kind()
625        )))
626    }
627
628    /// Cooperatively wind down. The task may still end succeeded or failed.
629    async fn cancel(&self, task: &SessionTask, context: &crate::traits::ToolContext) -> Result<()>;
630
631    /// Refresh state for polled kinds (e.g. A2A remote tasks). Reports via
632    /// the registry; no-op by default.
633    async fn reconcile(
634        &self,
635        task: &SessionTask,
636        context: &crate::traits::ToolContext,
637    ) -> Result<()> {
638        let _ = (task, context);
639        Ok(())
640    }
641}
642
643/// Inventory plugin so capabilities register executors without core knowing
644/// about them (same pattern as `SessionSandboxProviderPlugin`).
645pub struct TaskExecutorPlugin {
646    pub executor: fn() -> Arc<dyn TaskExecutor>,
647}
648
649inventory::collect!(TaskExecutorPlugin);
650
651/// Find the registered executor for a task kind.
652pub fn find_task_executor(kind: &str) -> Option<Arc<dyn TaskExecutor>> {
653    inventory::iter::<TaskExecutorPlugin>
654        .into_iter()
655        .map(|plugin| (plugin.executor)())
656        .find(|executor| executor.kind() == kind)
657}
658
659// ============================================================================
660// Sink — report plane for running work
661// ============================================================================
662
663/// Report plane handed to running work. `state`/`progress`/`request_input`
664/// mutate the task record (snapshot events fire); `post` appends to the
665/// message channel; `output` is high-frequency and ephemeral.
666#[async_trait]
667pub trait TaskSink: Send + Sync {
668    async fn state(&self, state: SessionTaskState, detail: Option<String>) -> Result<()>;
669
670    async fn progress(&self, progress: TaskProgress) -> Result<()>;
671
672    /// High-frequency output delta. Not persisted on the task record.
673    async fn output(&self, stream: &str, delta: &str) -> Result<()>;
674
675    /// Outbound message to the session; may wake the parent per wake policy.
676    async fn post(&self, message: NewTaskMessage) -> Result<()>;
677
678    /// Ask the session for input; transitions the task to `awaiting_input`.
679    async fn request_input(&self, request: TaskInputRequest) -> Result<()>;
680
681    async fn artifact(&self, artifact: TaskArtifact) -> Result<()>;
682}
683
684/// `TaskSink` backed by a `SessionTaskRegistry`. Output deltas are dropped
685/// here; kinds with live output keep their existing streaming path.
686pub struct RegistryTaskSink {
687    registry: Arc<dyn SessionTaskRegistry>,
688    session_id: SessionId,
689    task_id: String,
690}
691
692impl RegistryTaskSink {
693    pub fn new(
694        registry: Arc<dyn SessionTaskRegistry>,
695        session_id: SessionId,
696        task_id: String,
697    ) -> Self {
698        Self {
699            registry,
700            session_id,
701            task_id,
702        }
703    }
704}
705
706#[async_trait]
707impl TaskSink for RegistryTaskSink {
708    async fn state(&self, state: SessionTaskState, detail: Option<String>) -> Result<()> {
709        self.registry
710            .update(
711                self.session_id,
712                &self.task_id,
713                SessionTaskUpdate {
714                    state: Some(state),
715                    state_detail: detail,
716                    ..Default::default()
717                },
718            )
719            .await?;
720        Ok(())
721    }
722
723    async fn progress(&self, progress: TaskProgress) -> Result<()> {
724        self.registry
725            .update(
726                self.session_id,
727                &self.task_id,
728                SessionTaskUpdate {
729                    progress: Some(progress),
730                    ..Default::default()
731                },
732            )
733            .await?;
734        Ok(())
735    }
736
737    async fn output(&self, _stream: &str, _delta: &str) -> Result<()> {
738        Ok(())
739    }
740
741    async fn post(&self, message: NewTaskMessage) -> Result<()> {
742        self.registry
743            .record_message(self.session_id, &self.task_id, message)
744            .await?;
745        Ok(())
746    }
747
748    async fn request_input(&self, request: TaskInputRequest) -> Result<()> {
749        self.registry
750            .update(
751                self.session_id,
752                &self.task_id,
753                SessionTaskUpdate {
754                    input_request: Some(request),
755                    ..Default::default()
756                },
757            )
758            .await?;
759        Ok(())
760    }
761
762    async fn artifact(&self, artifact: TaskArtifact) -> Result<()> {
763        let Some(task) = self.registry.get(self.session_id, &self.task_id).await? else {
764            return Ok(());
765        };
766        let mut artifacts = task.artifacts;
767        artifacts.push(artifact);
768        self.registry
769            .update(
770                self.session_id,
771                &self.task_id,
772                SessionTaskUpdate {
773                    artifacts: Some(artifacts),
774                    ..Default::default()
775                },
776            )
777            .await?;
778        Ok(())
779    }
780}
781
782/// VFS directory for a task's result and logs.
783pub fn task_vfs_dir(task_id: &str) -> String {
784    format!("/.tasks/{task_id}")
785}
786
787/// VFS path for a task's machine result.
788pub fn task_result_path(task_id: &str) -> String {
789    format!("/.tasks/{task_id}/result.json")
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795
796    fn task() -> SessionTask {
797        new_session_task(
798            CreateSessionTask {
799                session_id: SessionId::new(),
800                id: None,
801                kind: TASK_KIND_BACKGROUND_TOOL.to_string(),
802                display_name: "Test".to_string(),
803                spec: serde_json::json!({}),
804                state: SessionTaskState::Queued,
805                links: TaskLinks::default(),
806                wake_policy: TaskWakePolicy::Silent,
807            },
808            Utc::now(),
809        )
810    }
811
812    #[test]
813    fn create_generates_prefixed_id() {
814        let t = task();
815        assert!(t.id.starts_with("task_"));
816        assert_eq!(t.state, SessionTaskState::Queued);
817        assert!(t.started_at.is_none());
818    }
819
820    #[test]
821    fn first_transition_out_of_queued_stamps_started_at() {
822        let mut t = task();
823        let now = Utc::now();
824        apply_task_update(
825            &mut t,
826            SessionTaskUpdate {
827                state: Some(SessionTaskState::Running),
828                ..Default::default()
829            },
830            now,
831        );
832        assert_eq!(t.state, SessionTaskState::Running);
833        assert_eq!(t.started_at, Some(now));
834        assert!(t.finished_at.is_none());
835    }
836
837    #[test]
838    fn terminal_transition_stamps_finished_at_and_is_final() {
839        let mut t = task();
840        let now = Utc::now();
841        apply_task_update(
842            &mut t,
843            SessionTaskUpdate {
844                state: Some(SessionTaskState::Succeeded),
845                summary: Some("done".to_string()),
846                ..Default::default()
847            },
848            now,
849        );
850        assert_eq!(t.state, SessionTaskState::Succeeded);
851        assert_eq!(t.finished_at, Some(now));
852
853        // An update carrying a *different* state lost a race against the
854        // terminal transition — it is ignored entirely, content included
855        // (e.g. the reaper must not stamp an orphaned error on a task that
856        // succeeded meanwhile).
857        apply_task_update(
858            &mut t,
859            SessionTaskUpdate {
860                state: Some(SessionTaskState::Failed),
861                error: Some(TaskError {
862                    kind: "orphaned".to_string(),
863                    message: "stale".to_string(),
864                }),
865                ..Default::default()
866            },
867            Utc::now(),
868        );
869        assert_eq!(t.state, SessionTaskState::Succeeded);
870        assert!(t.error.is_none());
871
872        // Idempotent re-mirrors with the SAME terminal state still enrich.
873        apply_task_update(
874            &mut t,
875            SessionTaskUpdate {
876                state: Some(SessionTaskState::Succeeded),
877                result_path: Some("/.tasks/x/result.json".to_string()),
878                ..Default::default()
879            },
880            Utc::now(),
881        );
882        assert_eq!(t.result_path.as_deref(), Some("/.tasks/x/result.json"));
883
884        // Content-only updates (no state) also still apply.
885        apply_task_update(
886            &mut t,
887            SessionTaskUpdate {
888                summary: Some("enriched".to_string()),
889                ..Default::default()
890            },
891            Utc::now(),
892        );
893        assert_eq!(t.summary.as_deref(), Some("enriched"));
894    }
895
896    #[test]
897    fn input_request_forces_awaiting_input_and_clears_on_resume() {
898        let mut t = task();
899        apply_task_update(
900            &mut t,
901            SessionTaskUpdate {
902                input_request: Some(TaskInputRequest {
903                    id: "req_1".to_string(),
904                    prompt: "Approve?".to_string(),
905                    expected: None,
906                }),
907                ..Default::default()
908            },
909            Utc::now(),
910        );
911        assert_eq!(t.state, SessionTaskState::AwaitingInput);
912        assert!(t.input_request.is_some());
913
914        apply_task_update(
915            &mut t,
916            SessionTaskUpdate {
917                state: Some(SessionTaskState::Running),
918                ..Default::default()
919            },
920            Utc::now(),
921        );
922        assert_eq!(t.state, SessionTaskState::Running);
923        assert!(t.input_request.is_none());
924    }
925
926    #[test]
927    fn links_merge_without_duplicates() {
928        let mut t = task();
929        let child = SessionId::new();
930        apply_task_update(
931            &mut t,
932            SessionTaskUpdate {
933                links: Some(TaskLinks {
934                    child_session_id: Some(child),
935                    remote_task_id: None,
936                    resource_ids: vec!["res_1".to_string()],
937                }),
938                ..Default::default()
939            },
940            Utc::now(),
941        );
942        apply_task_update(
943            &mut t,
944            SessionTaskUpdate {
945                links: Some(TaskLinks {
946                    child_session_id: None,
947                    remote_task_id: Some("rt_1".to_string()),
948                    resource_ids: vec!["res_1".to_string(), "res_2".to_string()],
949                }),
950                ..Default::default()
951            },
952            Utc::now(),
953        );
954        assert_eq!(t.links.child_session_id, Some(child));
955        assert_eq!(t.links.remote_task_id.as_deref(), Some("rt_1"));
956        assert_eq!(t.links.resource_ids, vec!["res_1", "res_2"]);
957    }
958
959    #[test]
960    fn message_text_rendering() {
961        let msg = NewTaskMessage::outbound_text("hello");
962        assert_eq!(task_message_text(&msg.content), "hello");
963    }
964}