Skip to main content

everruns_core/
session_task.rs

1// Session tasks — unified registry of background work owned by a session.
2//
3// See specs/session-tasks.md. A task is any asynchronous work a session owns
4// (subagent, external A2A agent, background tool, monitor). The registry owns
5// the record, lifecycle invariants, and task.* events; capabilities plug in
6// `TaskExecutor`s (control plane) and report through `TaskSink` (report plane).
7//
8// Decision: lifecycle invariants live in `apply_task_update` so every backend
9// (PostgreSQL, in-memory, gRPC) applies identical semantics.
10// Decision: kind is a free-form string for extensibility — no enum.
11// Decision: cancellation is cooperative — `request_cancel` records intent via
12// `cancel_requested_at`; executors wind down and report the terminal state.
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use std::sync::Arc;
19
20use crate::error::Result;
21use crate::typed_id::SessionId;
22
23#[cfg(feature = "openapi")]
24use utoipa::ToSchema;
25
26/// Progress shape shared with background tool execution.
27pub type TaskProgress = crate::background::BackgroundProgress;
28
29/// Well-known task kinds. Kind stays a free-form string; these constants
30/// cover the built-in executors.
31pub const TASK_KIND_SUBAGENT: &str = "subagent";
32pub const TASK_KIND_EXTERNAL_AGENT: &str = "external_agent";
33pub const TASK_KIND_BACKGROUND_TOOL: &str = "background_tool";
34/// Long-lived monitor task linked to a session schedule. Stays `running`
35/// until the linked schedule is exhausted (one-shot) or `cancel_task` is called.
36pub const TASK_KIND_MONITOR: &str = "monitor";
37
38/// Generate a new task ID (`task_` prefix per specs/id-schema.md).
39pub fn generate_task_id() -> String {
40    format!("task_{}", uuid::Uuid::now_v7().simple())
41}
42
43/// Generate a new task message ID.
44pub fn generate_task_message_id() -> String {
45    format!("tmsg_{}", uuid::Uuid::now_v7().simple())
46}
47
48/// Lifecycle state of a session task.
49///
50/// Three classes: active (`queued`, `running`), interrupted (`awaiting_input`,
51/// resumable), terminal (`succeeded`, `failed`, `canceled`). Timeout and
52/// rejection are `error.kind` values on `failed`, not states.
53#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
54#[cfg_attr(feature = "openapi", derive(ToSchema))]
55#[serde(rename_all = "snake_case")]
56pub enum SessionTaskState {
57    Queued,
58    Running,
59    AwaitingInput,
60    Succeeded,
61    Failed,
62    Canceled,
63}
64
65impl SessionTaskState {
66    pub fn is_terminal(&self) -> bool {
67        matches!(self, Self::Succeeded | Self::Failed | Self::Canceled)
68    }
69
70    /// Strict parser for caller-supplied state strings (API filters, tool
71    /// arguments). Unlike `From<&str>` — which exists for trusted,
72    /// CHECK-constrained storage values and defaults to `Queued` — this
73    /// returns None for unknown input so callers can reject it.
74    pub fn parse(s: &str) -> Option<Self> {
75        match s {
76            "queued" => Some(Self::Queued),
77            "running" => Some(Self::Running),
78            "awaiting_input" => Some(Self::AwaitingInput),
79            "succeeded" => Some(Self::Succeeded),
80            "failed" => Some(Self::Failed),
81            "canceled" => Some(Self::Canceled),
82            _ => None,
83        }
84    }
85}
86
87impl std::fmt::Display for SessionTaskState {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        let s = match self {
90            Self::Queued => "queued",
91            Self::Running => "running",
92            Self::AwaitingInput => "awaiting_input",
93            Self::Succeeded => "succeeded",
94            Self::Failed => "failed",
95            Self::Canceled => "canceled",
96        };
97        write!(f, "{s}")
98    }
99}
100
101impl From<&str> for SessionTaskState {
102    fn from(s: &str) -> Self {
103        match s {
104            "running" => Self::Running,
105            "awaiting_input" => Self::AwaitingInput,
106            "succeeded" => Self::Succeeded,
107            "failed" => Self::Failed,
108            "canceled" => Self::Canceled,
109            _ => Self::Queued,
110        }
111    }
112}
113
114/// When outbound task activity wakes the owning session's agent.
115#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
116#[cfg_attr(feature = "openapi", derive(ToSchema))]
117#[serde(rename_all = "snake_case")]
118pub enum TaskWakePolicy {
119    /// Never wake; the agent polls via `get_task`/`list_tasks`.
120    #[default]
121    Silent,
122    /// Wake on transition to a terminal state.
123    OnTerminal,
124    /// Wake on any outbound message or input request, and on terminal states.
125    OnActivity,
126}
127
128/// Structured ask posted by a task that needs input to continue.
129#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
130#[cfg_attr(feature = "openapi", derive(ToSchema))]
131pub struct TaskInputRequest {
132    /// Stable ID referenced by the answering message's `in_reply_to`.
133    pub id: String,
134    /// Human/agent-readable prompt.
135    pub prompt: String,
136    /// Optional machine-readable description of the expected answer.
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    #[cfg_attr(feature = "openapi", schema(value_type = Object))]
139    pub expected: Option<Value>,
140}
141
142/// Terminal error detail. Timeout/rejection/orphaned are kinds, not states.
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
144#[cfg_attr(feature = "openapi", derive(ToSchema))]
145pub struct TaskError {
146    pub kind: String,
147    pub message: String,
148}
149
150/// Typed link to something the task produced.
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152#[cfg_attr(feature = "openapi", derive(ToSchema))]
153pub struct TaskArtifact {
154    pub name: String,
155    /// Artifact type: "file", "url", "session", "pr", etc.
156    #[serde(rename = "type")]
157    pub artifact_type: String,
158    /// Session VFS path, when the artifact lives in the session filesystem.
159    #[serde(default, skip_serializing_if = "Option::is_none")]
160    pub path: Option<String>,
161    /// External URL, when the artifact lives elsewhere.
162    #[serde(default, skip_serializing_if = "Option::is_none")]
163    pub url: Option<String>,
164}
165
166/// Cross-references owned by a task.
167#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
168#[cfg_attr(feature = "openapi", derive(ToSchema))]
169pub struct TaskLinks {
170    /// Child session, for subagent-shaped tasks. Full transcript lives there.
171    #[serde(default, skip_serializing_if = "Option::is_none")]
172    #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
173    pub child_session_id: Option<SessionId>,
174    /// Remote task ID, for tasks wrapping an external protocol task (A2A).
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    pub remote_task_id: Option<String>,
177    /// Session resources (sandboxes, browser sessions) this task holds.
178    #[serde(default, skip_serializing_if = "Vec::is_empty")]
179    pub resource_ids: Vec<String>,
180}
181
182impl TaskLinks {
183    pub fn is_empty(&self) -> bool {
184        self.child_session_id.is_none()
185            && self.remote_task_id.is_none()
186            && self.resource_ids.is_empty()
187    }
188}
189
190/// A unit of background work owned by a session.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192#[cfg_attr(feature = "openapi", derive(ToSchema))]
193pub struct SessionTask {
194    /// `task_*` public ID.
195    pub id: String,
196    /// Owning session.
197    #[cfg_attr(feature = "openapi", schema(value_type = String))]
198    pub session_id: SessionId,
199    /// Task kind: "subagent", "external_agent", "background_tool", "monitor", …
200    pub kind: String,
201    /// Human-readable label.
202    pub display_name: String,
203    /// Kind-specific input (instructions, tool args, external agent id).
204    #[serde(default)]
205    #[cfg_attr(feature = "openapi", schema(value_type = Object))]
206    pub spec: Value,
207    pub state: SessionTaskState,
208    /// Short live status line ("polling remote task", "iteration 4/10").
209    #[serde(default, skip_serializing_if = "Option::is_none")]
210    pub state_detail: Option<String>,
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub progress: Option<TaskProgress>,
213    /// Pending ask while `awaiting_input`; cleared when answered.
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub input_request: Option<TaskInputRequest>,
216    /// Cooperative cancel intent. A flag, not a state.
217    #[serde(default, skip_serializing_if = "Option::is_none")]
218    pub cancel_requested_at: Option<DateTime<Utc>>,
219    /// Human-readable outcome.
220    #[serde(default, skip_serializing_if = "Option::is_none")]
221    pub summary: Option<String>,
222    /// Machine result in the session VFS: `/.tasks/{task_id}/result.json`.
223    #[serde(default, skip_serializing_if = "Option::is_none")]
224    pub result_path: Option<String>,
225    #[serde(default, skip_serializing_if = "Vec::is_empty")]
226    pub artifacts: Vec<TaskArtifact>,
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    pub error: Option<TaskError>,
229    /// Execution attempt, starting at 1. Incremented on re-attach.
230    #[serde(default = "default_attempt")]
231    pub attempt: i32,
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub worker_id: Option<String>,
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub heartbeat_at: Option<DateTime<Utc>>,
236    #[serde(default, skip_serializing_if = "TaskLinks::is_empty")]
237    pub links: TaskLinks,
238    #[serde(default)]
239    pub wake_policy: TaskWakePolicy,
240    pub created_at: DateTime<Utc>,
241    #[serde(default, skip_serializing_if = "Option::is_none")]
242    pub started_at: Option<DateTime<Utc>>,
243    #[serde(default, skip_serializing_if = "Option::is_none")]
244    pub finished_at: Option<DateTime<Utc>>,
245    pub updated_at: DateTime<Utc>,
246}
247
248fn default_attempt() -> i32 {
249    1
250}
251
252/// Input for creating a task.
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct CreateSessionTask {
255    pub session_id: SessionId,
256    /// Caller-supplied ID for idempotent creation; generated when None.
257    #[serde(default)]
258    pub id: Option<String>,
259    pub kind: String,
260    pub display_name: String,
261    #[serde(default)]
262    pub spec: Value,
263    /// Initial state; defaults to Queued.
264    #[serde(default = "default_queued")]
265    pub state: SessionTaskState,
266    #[serde(default)]
267    pub links: TaskLinks,
268    #[serde(default)]
269    pub wake_policy: TaskWakePolicy,
270}
271
272fn default_queued() -> SessionTaskState {
273    SessionTaskState::Queued
274}
275
276/// Partial update applied through `apply_task_update`. None = unchanged.
277#[derive(Debug, Clone, Default, Serialize, Deserialize)]
278pub struct SessionTaskUpdate {
279    pub state: Option<SessionTaskState>,
280    pub state_detail: Option<String>,
281    pub progress: Option<TaskProgress>,
282    /// Setting an input request implies `awaiting_input`.
283    pub input_request: Option<TaskInputRequest>,
284    pub summary: Option<String>,
285    pub result_path: Option<String>,
286    /// Replaces the artifact list when set.
287    pub artifacts: Option<Vec<TaskArtifact>>,
288    pub error: Option<TaskError>,
289    /// Merged field-by-field into existing links.
290    pub links: Option<TaskLinks>,
291    pub worker_id: Option<String>,
292    /// Liveness heartbeat timestamp.
293    pub heartbeat_at: Option<DateTime<Utc>>,
294    /// Stale-attempt fence: when set, the update is silently ignored if
295    /// `task.attempt != expected_attempt`. Executors and sinks set this to
296    /// the attempt they captured at start; the reaper bumps `attempt` (via
297    /// `increment_attempt`) when it fails an orphan, so a zombie executor's
298    /// later writes are rejected. Writers that do not track attempts
299    /// (e.g. `cancel_task` from the API) leave this None.
300    #[serde(default, skip_serializing_if = "Option::is_none")]
301    pub expected_attempt: Option<i32>,
302    /// Supersede the current attempt: bumps `task.attempt` so writes fenced
303    /// on the previous attempt are rejected from now on. Set by the reaper
304    /// when it fails an orphaned task. Ignored if the update itself is
305    /// dropped by the fence or the terminal-state invariant.
306    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
307    pub increment_attempt: bool,
308}
309
310/// Optional filter for listing tasks.
311#[derive(Debug, Clone, Default)]
312pub struct SessionTaskFilter {
313    pub kind: Option<String>,
314    pub state: Option<SessionTaskState>,
315}
316
317/// Apply a partial update to a task, enforcing lifecycle invariants.
318///
319/// All registry backends route updates through this function so semantics
320/// stay identical across PostgreSQL, in-memory, and gRPC modes:
321/// - terminal states are final: state changes on a terminal task are ignored
322///   (content fields like summary/result still apply);
323/// - first transition out of `queued` stamps `started_at`;
324/// - transition into a terminal state stamps `finished_at`;
325/// - setting `input_request` forces `awaiting_input`; leaving
326///   `awaiting_input` clears it.
327pub fn apply_task_update(task: &mut SessionTask, update: SessionTaskUpdate, now: DateTime<Utc>) {
328    // Stale-attempt fence: if the update carries an attempt expectation and it
329    // does not match the current attempt, this write came from a superseded
330    // executor — ignore it entirely (heartbeats, state changes, everything).
331    if let Some(expected) = update.expected_attempt
332        && expected != task.attempt
333    {
334        return;
335    }
336
337    let was_terminal = task.state.is_terminal();
338
339    // Terminal states are final. An update that asks for a *different* state
340    // on an already-terminal task lost a race (e.g. the reaper marking a task
341    // orphaned after it succeeded) — ignore it entirely so its content fields
342    // (error, summary) cannot corrupt the terminal record. Updates that carry
343    // the same terminal state (idempotent re-mirrors) or no state at all
344    // (content enrichment) still apply below.
345    if was_terminal
346        && let Some(state) = update.state
347        && state != task.state
348    {
349        return;
350    }
351
352    // Supersede the current attempt (reaper failing an orphan): writes fenced
353    // on the old attempt are rejected from here on.
354    if update.increment_attempt {
355        task.attempt += 1;
356    }
357
358    let mut next_state = update.state;
359    if update.input_request.is_some() && !was_terminal {
360        next_state = Some(SessionTaskState::AwaitingInput);
361    }
362
363    if let Some(input_request) = update.input_request
364        && !was_terminal
365    {
366        task.input_request = Some(input_request);
367    }
368
369    if let Some(state) = next_state
370        && !was_terminal
371        && task.state != state
372    {
373        if task.state == SessionTaskState::Queued && state != SessionTaskState::Queued {
374            task.started_at.get_or_insert(now);
375        }
376        if state.is_terminal() {
377            task.finished_at.get_or_insert(now);
378        }
379        if state != SessionTaskState::AwaitingInput {
380            task.input_request = None;
381        }
382        task.state = state;
383    }
384
385    if let Some(detail) = update.state_detail {
386        task.state_detail = Some(detail);
387    }
388    if let Some(progress) = update.progress {
389        task.progress = Some(progress);
390    }
391    if let Some(summary) = update.summary {
392        task.summary = Some(summary);
393    }
394    if let Some(result_path) = update.result_path {
395        task.result_path = Some(result_path);
396    }
397    if let Some(artifacts) = update.artifacts {
398        task.artifacts = artifacts;
399    }
400    if let Some(error) = update.error {
401        task.error = Some(error);
402    }
403    if let Some(links) = update.links {
404        if links.child_session_id.is_some() {
405            task.links.child_session_id = links.child_session_id;
406        }
407        if links.remote_task_id.is_some() {
408            task.links.remote_task_id = links.remote_task_id;
409        }
410        for id in links.resource_ids {
411            if !task.links.resource_ids.contains(&id) {
412                task.links.resource_ids.push(id);
413            }
414        }
415    }
416    if let Some(worker_id) = update.worker_id {
417        task.worker_id = Some(worker_id);
418    }
419    if let Some(heartbeat_at) = update.heartbeat_at {
420        task.heartbeat_at = Some(heartbeat_at);
421    }
422
423    task.updated_at = now;
424}
425
426/// Build a new task from creation input.
427pub fn new_session_task(input: CreateSessionTask, now: DateTime<Utc>) -> SessionTask {
428    let state = input.state;
429    SessionTask {
430        id: input.id.unwrap_or_else(generate_task_id),
431        session_id: input.session_id,
432        kind: input.kind,
433        display_name: input.display_name,
434        spec: input.spec,
435        state,
436        state_detail: None,
437        progress: None,
438        input_request: None,
439        cancel_requested_at: None,
440        summary: None,
441        result_path: None,
442        artifacts: Vec::new(),
443        error: None,
444        attempt: 1,
445        worker_id: None,
446        heartbeat_at: None,
447        links: input.links,
448        wake_policy: input.wake_policy,
449        created_at: now,
450        started_at: if state == SessionTaskState::Queued {
451            None
452        } else {
453            Some(now)
454        },
455        finished_at: if state.is_terminal() { Some(now) } else { None },
456        updated_at: now,
457    }
458}
459
460// ============================================================================
461// Messages — bidirectional, persisted channel between session and task
462// ============================================================================
463
464/// Direction of a task message. Inbound = session → task.
465#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
466#[cfg_attr(feature = "openapi", derive(ToSchema))]
467#[serde(rename_all = "snake_case")]
468pub enum TaskMessageDirection {
469    Inbound,
470    Outbound,
471}
472
473impl std::fmt::Display for TaskMessageDirection {
474    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475        match self {
476            Self::Inbound => write!(f, "inbound"),
477            Self::Outbound => write!(f, "outbound"),
478        }
479    }
480}
481
482impl From<&str> for TaskMessageDirection {
483    fn from(s: &str) -> Self {
484        match s {
485            "outbound" => Self::Outbound,
486            _ => Self::Inbound,
487        }
488    }
489}
490
491/// One content part of a task message.
492#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
493#[cfg_attr(feature = "openapi", derive(ToSchema))]
494#[serde(tag = "type", rename_all = "snake_case")]
495pub enum TaskMessagePart {
496    Text {
497        text: String,
498    },
499    Data {
500        #[cfg_attr(feature = "openapi", schema(value_type = Object))]
501        data: Value,
502    },
503}
504
505impl TaskMessagePart {
506    pub fn text(text: impl Into<String>) -> Self {
507        Self::Text { text: text.into() }
508    }
509}
510
511/// A message exchanged between a session and one of its tasks.
512#[derive(Debug, Clone, Serialize, Deserialize)]
513#[cfg_attr(feature = "openapi", derive(ToSchema))]
514pub struct TaskMessage {
515    /// `tmsg_*` public ID.
516    pub id: String,
517    pub task_id: String,
518    pub direction: TaskMessageDirection,
519    pub content: Vec<TaskMessagePart>,
520    /// Set when this message answers a `TaskInputRequest`.
521    #[serde(default, skip_serializing_if = "Option::is_none")]
522    pub in_reply_to: Option<String>,
523    pub created_at: DateTime<Utc>,
524}
525
526/// Input for recording a task message.
527#[derive(Debug, Clone, Serialize, Deserialize)]
528pub struct NewTaskMessage {
529    pub direction: TaskMessageDirection,
530    pub content: Vec<TaskMessagePart>,
531    #[serde(default)]
532    pub in_reply_to: Option<String>,
533    /// Stale-attempt fence for message writes: when set, registries reject
534    /// the message if `task.attempt` no longer matches, so a superseded
535    /// executor cannot append to the thread or trigger wake-ups. Not stored
536    /// with the message.
537    #[serde(default, skip_serializing_if = "Option::is_none")]
538    pub expected_attempt: Option<i32>,
539}
540
541impl NewTaskMessage {
542    pub fn inbound_text(text: impl Into<String>) -> Self {
543        Self {
544            direction: TaskMessageDirection::Inbound,
545            content: vec![TaskMessagePart::text(text)],
546            in_reply_to: None,
547            expected_attempt: None,
548        }
549    }
550
551    pub fn outbound_text(text: impl Into<String>) -> Self {
552        Self {
553            direction: TaskMessageDirection::Outbound,
554            content: vec![TaskMessagePart::text(text)],
555            in_reply_to: None,
556            expected_attempt: None,
557        }
558    }
559
560    /// Fence this message write on the given attempt (see `expected_attempt`).
561    pub fn with_expected_attempt(mut self, attempt: i32) -> Self {
562        self.expected_attempt = Some(attempt);
563        self
564    }
565}
566
567/// Plain-text rendering of message content (for steering/wake messages).
568pub fn task_message_text(content: &[TaskMessagePart]) -> String {
569    content
570        .iter()
571        .filter_map(|part| match part {
572            TaskMessagePart::Text { text } => Some(text.as_str()),
573            TaskMessagePart::Data { .. } => None,
574        })
575        .collect::<Vec<_>>()
576        .join("\n")
577}
578
579// ============================================================================
580// Registry — owns the record, invariants, events, and durability
581// ============================================================================
582
583/// Session task registry. Implementations emit `task.created` /
584/// `task.updated` (full snapshots) and `task.message.*` events on the owning
585/// session's event stream.
586#[async_trait]
587pub trait SessionTaskRegistry: Send + Sync {
588    /// Create a task (idempotent on caller-supplied ID: re-creating an
589    /// existing ID returns the stored task unchanged).
590    async fn create(&self, input: CreateSessionTask) -> Result<SessionTask>;
591
592    /// Apply a partial update through `apply_task_update` invariants.
593    async fn update(
594        &self,
595        session_id: SessionId,
596        task_id: &str,
597        update: SessionTaskUpdate,
598    ) -> Result<Option<SessionTask>>;
599
600    async fn get(&self, session_id: SessionId, task_id: &str) -> Result<Option<SessionTask>>;
601
602    async fn list(
603        &self,
604        session_id: SessionId,
605        filter: Option<&SessionTaskFilter>,
606    ) -> Result<Vec<SessionTask>>;
607
608    /// Record cooperative cancel intent (idempotent). Does not change state;
609    /// the executor winds down and reports the terminal state.
610    async fn request_cancel(
611        &self,
612        session_id: SessionId,
613        task_id: &str,
614    ) -> Result<Option<SessionTask>>;
615
616    /// Persist a message on the task's channel. Answering messages
617    /// (`in_reply_to` set) clear a matching pending input request and return
618    /// the task to `running`.
619    async fn record_message(
620        &self,
621        session_id: SessionId,
622        task_id: &str,
623        message: NewTaskMessage,
624    ) -> Result<TaskMessage>;
625
626    /// List messages on the task's channel, oldest first.
627    ///
628    /// When `after_id` is `Some`, only messages newer than that message ID are
629    /// returned (exclusive cursor, since_id-style). Both postgres and in-memory
630    /// backends implement the cursor; other backends ignore it and return all
631    /// messages up to `limit`.
632    async fn list_messages(
633        &self,
634        session_id: SessionId,
635        task_id: &str,
636        limit: Option<u32>,
637        after_id: Option<&str>,
638    ) -> Result<Vec<TaskMessage>>;
639}
640
641// ============================================================================
642// Executor — control plane, implemented per kind by capabilities
643// ============================================================================
644
645/// Control plane for a task kind. The registry/tools call into the executor;
646/// the running work pushes into a `TaskSink`.
647///
648/// Default method bodies return `unsupported` so kinds implement only what
649/// applies (e.g. a background tool rarely accepts inbound messages).
650#[async_trait]
651pub trait TaskExecutor: Send + Sync {
652    fn kind(&self) -> &str;
653
654    /// Whether this executor can re-attach to a running task after worker loss.
655    ///
656    /// Kinds returning `true` must implement `start` such that calling it with
657    /// a re-attached task snapshot (attempt already bumped by the reaper)
658    /// resumes the work idempotently and heartbeats with the new attempt.
659    /// Kinds returning `false` (the default) are failed as orphaned immediately
660    /// by the reaper.
661    fn can_reattach(&self) -> bool {
662        false
663    }
664
665    /// Whether this executor can re-attach to a *specific* task instance.
666    ///
667    /// Defaults to `self.can_reattach()`. Override to inspect per-task spec
668    /// fields (e.g. whether the spawned tool declared itself idempotent).
669    /// The reaper calls this instead of `can_reattach()` when a task snapshot
670    /// is available.
671    fn can_reattach_task(&self, task: &SessionTask) -> bool {
672        let _ = task;
673        self.can_reattach()
674    }
675
676    /// Begin execution, or re-attach after worker loss.
677    ///
678    /// Called by the reaper when re-attaching a task (attempt already bumped).
679    /// Implementations must heartbeat using `task.attempt` so stale writes from
680    /// the previous executor are rejected by the fence.
681    async fn start(&self, task: &SessionTask, context: &crate::traits::ToolContext) -> Result<()> {
682        let _ = (task, context);
683        Err(crate::error::AgentLoopError::tool(format!(
684            "task kind '{}' does not support start via the registry",
685            self.kind()
686        )))
687    }
688
689    /// Deliver an inbound message (steering or input answer) to the work.
690    async fn deliver(
691        &self,
692        task: &SessionTask,
693        message: &TaskMessage,
694        context: &crate::traits::ToolContext,
695    ) -> Result<()> {
696        let _ = (task, message, context);
697        Err(crate::error::AgentLoopError::tool(format!(
698            "task kind '{}' does not accept inbound messages",
699            self.kind()
700        )))
701    }
702
703    /// Cooperatively wind down. The task may still end succeeded or failed.
704    async fn cancel(&self, task: &SessionTask, context: &crate::traits::ToolContext) -> Result<()>;
705
706    /// Refresh state for polled kinds (e.g. A2A remote tasks). Reports via
707    /// the registry; no-op by default.
708    async fn reconcile(
709        &self,
710        task: &SessionTask,
711        context: &crate::traits::ToolContext,
712    ) -> Result<()> {
713        let _ = (task, context);
714        Ok(())
715    }
716}
717
718/// Inventory plugin so capabilities register executors without core knowing
719/// about them (same pattern as `SessionSandboxProviderPlugin`).
720pub struct TaskExecutorPlugin {
721    pub executor: fn() -> Arc<dyn TaskExecutor>,
722}
723
724inventory::collect!(TaskExecutorPlugin);
725
726/// Find the registered executor for a task kind.
727pub fn find_task_executor(kind: &str) -> Option<Arc<dyn TaskExecutor>> {
728    inventory::iter::<TaskExecutorPlugin>
729        .into_iter()
730        .map(|plugin| (plugin.executor)())
731        .find(|executor| executor.kind() == kind)
732}
733
734// ============================================================================
735// Sink — report plane for running work
736// ============================================================================
737
738/// Report plane handed to running work. `state`/`progress`/`request_input`
739/// mutate the task record (snapshot events fire); `post` appends to the
740/// message channel; `output` is high-frequency and ephemeral.
741#[async_trait]
742pub trait TaskSink: Send + Sync {
743    async fn state(&self, state: SessionTaskState, detail: Option<String>) -> Result<()>;
744
745    async fn progress(&self, progress: TaskProgress) -> Result<()>;
746
747    /// High-frequency output delta. Not persisted on the task record.
748    async fn output(&self, stream: &str, delta: &str) -> Result<()>;
749
750    /// Outbound message to the session; may wake the parent per wake policy.
751    async fn post(&self, message: NewTaskMessage) -> Result<()>;
752
753    /// Ask the session for input; transitions the task to `awaiting_input`.
754    async fn request_input(&self, request: TaskInputRequest) -> Result<()>;
755
756    async fn artifact(&self, artifact: TaskArtifact) -> Result<()>;
757}
758
759/// `TaskSink` backed by a `SessionTaskRegistry`. Output deltas are dropped
760/// here; kinds with live output keep their existing streaming path.
761///
762/// Carries `attempt` for stale-attempt fencing: every update includes
763/// `expected_attempt` so writes from a superseded executor are rejected once
764/// the reaper increments the attempt counter on the task record.
765pub struct RegistryTaskSink {
766    registry: Arc<dyn SessionTaskRegistry>,
767    session_id: SessionId,
768    task_id: String,
769    /// The attempt number this sink was created for (captured at task start).
770    attempt: i32,
771}
772
773impl RegistryTaskSink {
774    pub fn new(
775        registry: Arc<dyn SessionTaskRegistry>,
776        session_id: SessionId,
777        task_id: String,
778    ) -> Self {
779        Self {
780            registry,
781            session_id,
782            task_id,
783            attempt: 1,
784        }
785    }
786
787    /// Set the attempt number for fencing. Call this after reading the task
788    /// record at start so the sink rejects writes once the attempt is bumped.
789    pub fn with_attempt(mut self, attempt: i32) -> Self {
790        self.attempt = attempt;
791        self
792    }
793}
794
795#[async_trait]
796impl TaskSink for RegistryTaskSink {
797    async fn state(&self, state: SessionTaskState, detail: Option<String>) -> Result<()> {
798        self.registry
799            .update(
800                self.session_id,
801                &self.task_id,
802                SessionTaskUpdate {
803                    state: Some(state),
804                    state_detail: detail,
805                    expected_attempt: Some(self.attempt),
806                    ..Default::default()
807                },
808            )
809            .await?;
810        Ok(())
811    }
812
813    async fn progress(&self, progress: TaskProgress) -> Result<()> {
814        self.registry
815            .update(
816                self.session_id,
817                &self.task_id,
818                SessionTaskUpdate {
819                    progress: Some(progress),
820                    expected_attempt: Some(self.attempt),
821                    ..Default::default()
822                },
823            )
824            .await?;
825        Ok(())
826    }
827
828    async fn output(&self, _stream: &str, _delta: &str) -> Result<()> {
829        Ok(())
830    }
831
832    async fn post(&self, message: NewTaskMessage) -> Result<()> {
833        // Fence message writes too: record_message emits events and can wake
834        // the parent session, so a superseded executor must not post.
835        self.registry
836            .record_message(
837                self.session_id,
838                &self.task_id,
839                message.with_expected_attempt(self.attempt),
840            )
841            .await?;
842        Ok(())
843    }
844
845    async fn request_input(&self, request: TaskInputRequest) -> Result<()> {
846        self.registry
847            .update(
848                self.session_id,
849                &self.task_id,
850                SessionTaskUpdate {
851                    input_request: Some(request),
852                    expected_attempt: Some(self.attempt),
853                    ..Default::default()
854                },
855            )
856            .await?;
857        Ok(())
858    }
859
860    async fn artifact(&self, artifact: TaskArtifact) -> Result<()> {
861        let Some(task) = self.registry.get(self.session_id, &self.task_id).await? else {
862            return Ok(());
863        };
864        // Check attempt before fetching artifacts to avoid a stale write.
865        if task.attempt != self.attempt {
866            return Ok(());
867        }
868        let mut artifacts = task.artifacts;
869        artifacts.push(artifact);
870        self.registry
871            .update(
872                self.session_id,
873                &self.task_id,
874                SessionTaskUpdate {
875                    artifacts: Some(artifacts),
876                    expected_attempt: Some(self.attempt),
877                    ..Default::default()
878                },
879            )
880            .await?;
881        Ok(())
882    }
883}
884
885/// VFS directory for a task's result and logs.
886pub fn task_vfs_dir(task_id: &str) -> String {
887    format!("/.tasks/{task_id}")
888}
889
890/// VFS path for a task's machine result.
891pub fn task_result_path(task_id: &str) -> String {
892    format!("/.tasks/{task_id}/result.json")
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    fn task() -> SessionTask {
900        new_session_task(
901            CreateSessionTask {
902                session_id: SessionId::new(),
903                id: None,
904                kind: TASK_KIND_BACKGROUND_TOOL.to_string(),
905                display_name: "Test".to_string(),
906                spec: serde_json::json!({}),
907                state: SessionTaskState::Queued,
908                links: TaskLinks::default(),
909                wake_policy: TaskWakePolicy::Silent,
910            },
911            Utc::now(),
912        )
913    }
914
915    #[test]
916    fn create_generates_prefixed_id() {
917        let t = task();
918        assert!(t.id.starts_with("task_"));
919        assert_eq!(t.state, SessionTaskState::Queued);
920        assert!(t.started_at.is_none());
921    }
922
923    #[test]
924    fn first_transition_out_of_queued_stamps_started_at() {
925        let mut t = task();
926        let now = Utc::now();
927        apply_task_update(
928            &mut t,
929            SessionTaskUpdate {
930                state: Some(SessionTaskState::Running),
931                ..Default::default()
932            },
933            now,
934        );
935        assert_eq!(t.state, SessionTaskState::Running);
936        assert_eq!(t.started_at, Some(now));
937        assert!(t.finished_at.is_none());
938    }
939
940    #[test]
941    fn terminal_transition_stamps_finished_at_and_is_final() {
942        let mut t = task();
943        let now = Utc::now();
944        apply_task_update(
945            &mut t,
946            SessionTaskUpdate {
947                state: Some(SessionTaskState::Succeeded),
948                summary: Some("done".to_string()),
949                ..Default::default()
950            },
951            now,
952        );
953        assert_eq!(t.state, SessionTaskState::Succeeded);
954        assert_eq!(t.finished_at, Some(now));
955
956        // An update carrying a *different* state lost a race against the
957        // terminal transition — it is ignored entirely, content included
958        // (e.g. the reaper must not stamp an orphaned error on a task that
959        // succeeded meanwhile).
960        apply_task_update(
961            &mut t,
962            SessionTaskUpdate {
963                state: Some(SessionTaskState::Failed),
964                error: Some(TaskError {
965                    kind: "orphaned".to_string(),
966                    message: "stale".to_string(),
967                }),
968                ..Default::default()
969            },
970            Utc::now(),
971        );
972        assert_eq!(t.state, SessionTaskState::Succeeded);
973        assert!(t.error.is_none());
974
975        // Idempotent re-mirrors with the SAME terminal state still enrich.
976        apply_task_update(
977            &mut t,
978            SessionTaskUpdate {
979                state: Some(SessionTaskState::Succeeded),
980                result_path: Some("/.tasks/x/result.json".to_string()),
981                ..Default::default()
982            },
983            Utc::now(),
984        );
985        assert_eq!(t.result_path.as_deref(), Some("/.tasks/x/result.json"));
986
987        // Content-only updates (no state) also still apply.
988        apply_task_update(
989            &mut t,
990            SessionTaskUpdate {
991                summary: Some("enriched".to_string()),
992                ..Default::default()
993            },
994            Utc::now(),
995        );
996        assert_eq!(t.summary.as_deref(), Some("enriched"));
997    }
998
999    #[test]
1000    fn input_request_forces_awaiting_input_and_clears_on_resume() {
1001        let mut t = task();
1002        apply_task_update(
1003            &mut t,
1004            SessionTaskUpdate {
1005                input_request: Some(TaskInputRequest {
1006                    id: "req_1".to_string(),
1007                    prompt: "Approve?".to_string(),
1008                    expected: None,
1009                }),
1010                ..Default::default()
1011            },
1012            Utc::now(),
1013        );
1014        assert_eq!(t.state, SessionTaskState::AwaitingInput);
1015        assert!(t.input_request.is_some());
1016
1017        apply_task_update(
1018            &mut t,
1019            SessionTaskUpdate {
1020                state: Some(SessionTaskState::Running),
1021                ..Default::default()
1022            },
1023            Utc::now(),
1024        );
1025        assert_eq!(t.state, SessionTaskState::Running);
1026        assert!(t.input_request.is_none());
1027    }
1028
1029    #[test]
1030    fn links_merge_without_duplicates() {
1031        let mut t = task();
1032        let child = SessionId::new();
1033        apply_task_update(
1034            &mut t,
1035            SessionTaskUpdate {
1036                links: Some(TaskLinks {
1037                    child_session_id: Some(child),
1038                    remote_task_id: None,
1039                    resource_ids: vec!["res_1".to_string()],
1040                }),
1041                ..Default::default()
1042            },
1043            Utc::now(),
1044        );
1045        apply_task_update(
1046            &mut t,
1047            SessionTaskUpdate {
1048                links: Some(TaskLinks {
1049                    child_session_id: None,
1050                    remote_task_id: Some("rt_1".to_string()),
1051                    resource_ids: vec!["res_1".to_string(), "res_2".to_string()],
1052                }),
1053                ..Default::default()
1054            },
1055            Utc::now(),
1056        );
1057        assert_eq!(t.links.child_session_id, Some(child));
1058        assert_eq!(t.links.remote_task_id.as_deref(), Some("rt_1"));
1059        assert_eq!(t.links.resource_ids, vec!["res_1", "res_2"]);
1060    }
1061
1062    #[test]
1063    fn message_text_rendering() {
1064        let msg = NewTaskMessage::outbound_text("hello");
1065        assert_eq!(task_message_text(&msg.content), "hello");
1066    }
1067
1068    // -------------------------------------------------------------------------
1069    // Stale-attempt fencing tests
1070    // -------------------------------------------------------------------------
1071
1072    #[test]
1073    fn update_with_matching_attempt_applies() {
1074        let mut t = task();
1075        // Task starts at attempt 1.
1076        assert_eq!(t.attempt, 1);
1077        let now = Utc::now();
1078
1079        // An update that carries expected_attempt == task.attempt applies normally.
1080        apply_task_update(
1081            &mut t,
1082            SessionTaskUpdate {
1083                state: Some(SessionTaskState::Running),
1084                state_detail: Some("step 1".to_string()),
1085                heartbeat_at: Some(now),
1086                expected_attempt: Some(1),
1087                ..Default::default()
1088            },
1089            now,
1090        );
1091        assert_eq!(t.state, SessionTaskState::Running);
1092        assert_eq!(t.state_detail.as_deref(), Some("step 1"));
1093        assert_eq!(t.heartbeat_at, Some(now));
1094    }
1095
1096    #[test]
1097    fn update_with_stale_attempt_is_fully_ignored() {
1098        let mut t = task();
1099        // Simulate the reaper bumping the attempt by directly setting it.
1100        t.attempt = 2;
1101
1102        let now = Utc::now();
1103        let before_updated = t.updated_at;
1104
1105        // A write from the old executor (attempt = 1) must be fully ignored —
1106        // state, heartbeat, and updated_at must not change.
1107        apply_task_update(
1108            &mut t,
1109            SessionTaskUpdate {
1110                state: Some(SessionTaskState::Running),
1111                state_detail: Some("superseded".to_string()),
1112                heartbeat_at: Some(now),
1113                expected_attempt: Some(1),
1114                ..Default::default()
1115            },
1116            now,
1117        );
1118        assert_eq!(t.state, SessionTaskState::Queued, "state must be unchanged");
1119        assert!(t.state_detail.is_none(), "state_detail must be unchanged");
1120        assert!(t.heartbeat_at.is_none(), "heartbeat must be unchanged");
1121        assert_eq!(t.updated_at, before_updated, "updated_at must be unchanged");
1122    }
1123
1124    #[test]
1125    fn update_with_none_expected_attempt_applies_regardless() {
1126        let mut t = task();
1127        // Even with attempt = 99 and no expected_attempt, the update applies.
1128        t.attempt = 99;
1129        let now = Utc::now();
1130
1131        apply_task_update(
1132            &mut t,
1133            SessionTaskUpdate {
1134                summary: Some("cancel from API".to_string()),
1135                expected_attempt: None,
1136                ..Default::default()
1137            },
1138            now,
1139        );
1140        // Writers that don't track attempts (e.g. cancel_task from the API) still apply.
1141        assert_eq!(t.summary.as_deref(), Some("cancel from API"));
1142    }
1143
1144    #[test]
1145    fn reaper_update_increments_attempt_and_fences_old_executor() {
1146        let mut t = task();
1147        t.state = SessionTaskState::Running;
1148        assert_eq!(t.attempt, 1);
1149        let now = Utc::now();
1150
1151        // Reaper-style update: fail as orphaned and supersede the attempt.
1152        apply_task_update(
1153            &mut t,
1154            SessionTaskUpdate {
1155                state: Some(SessionTaskState::Failed),
1156                error: Some(TaskError {
1157                    kind: "orphaned".to_string(),
1158                    message: "worker heartbeat stopped".to_string(),
1159                }),
1160                increment_attempt: true,
1161                ..Default::default()
1162            },
1163            now,
1164        );
1165        assert_eq!(t.state, SessionTaskState::Failed);
1166        assert_eq!(t.attempt, 2, "orphan reap must supersede the attempt");
1167
1168        // The zombie executor's content-only heartbeat (no state change, so the
1169        // terminal invariant alone would let it through) is now fenced out.
1170        let later = now + chrono::Duration::seconds(5);
1171        apply_task_update(
1172            &mut t,
1173            SessionTaskUpdate {
1174                heartbeat_at: Some(later),
1175                expected_attempt: Some(1),
1176                ..Default::default()
1177            },
1178            later,
1179        );
1180        assert_ne!(
1181            t.heartbeat_at,
1182            Some(later),
1183            "stale heartbeat must be rejected"
1184        );
1185    }
1186
1187    #[test]
1188    fn increment_attempt_is_inert_when_update_is_dropped() {
1189        let mut t = task();
1190        t.state = SessionTaskState::Succeeded;
1191        assert_eq!(t.attempt, 1);
1192        let now = Utc::now();
1193
1194        // Reaper losing the race against a clean finish: the terminal-state
1195        // invariant drops the whole update, including the attempt bump.
1196        apply_task_update(
1197            &mut t,
1198            SessionTaskUpdate {
1199                state: Some(SessionTaskState::Failed),
1200                error: Some(TaskError {
1201                    kind: "orphaned".to_string(),
1202                    message: "worker heartbeat stopped".to_string(),
1203                }),
1204                increment_attempt: true,
1205                ..Default::default()
1206            },
1207            now,
1208        );
1209        assert_eq!(t.state, SessionTaskState::Succeeded);
1210        assert_eq!(t.attempt, 1, "dropped update must not bump the attempt");
1211    }
1212}