use crate::types::{CapToken, Role, SessionId, TaskId};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, Notify};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ResumeKey(pub String);
impl ResumeKey {
pub fn new() -> Self {
Self(format!("R-{}", crate::types::uid_hex(12)))
}
pub fn for_senior(task_id: &TaskId) -> Self {
Self(format!("R-senior-{}", task_id.0))
}
}
impl Default for ResumeKey {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
Pending,
Running,
Suspended,
Pass,
Blocked,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskSpec {
pub agent: String,
pub initial_directive: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskState {
pub id: TaskId,
pub spec: TaskSpec,
pub status: TaskStatus,
pub attempt: u32,
pub suspended_on: Option<ResumeKey>,
pub last_result: Option<Value>,
pub created_at: u64,
pub updated_at: u64,
#[serde(default)]
pub spawn_depth: u32,
}
impl TaskState {
pub fn new(id: TaskId, spec: TaskSpec) -> Self {
let now = crate::types::now_unix();
Self {
id,
spec,
status: TaskStatus::Pending,
attempt: 0,
suspended_on: None,
last_result: None,
created_at: now,
updated_at: now,
spawn_depth: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DispatchOutcome {
Pass(Value),
Blocked(Value),
Suspended(ResumeKey),
Cancelled,
Timeout,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperatorSession {
pub id: SessionId,
pub operator_id: String,
pub role: Role,
pub attached_at: u64,
pub last_seen: u64,
pub attached: bool,
pub owned_task_ids: Vec<TaskId>,
pub token_nonce: String,
#[serde(default)]
pub operator_kind: Option<crate::core::ctx::OperatorKind>,
#[serde(default)]
pub runtime_agent_kinds: HashMap<String, crate::core::ctx::OperatorKind>,
#[serde(default)]
pub bp_agent_kinds: HashMap<String, crate::core::ctx::OperatorKind>,
#[serde(default)]
pub bp_global_kind: Option<crate::core::ctx::OperatorKind>,
#[serde(default)]
pub bridge_id: Option<String>,
#[serde(default)]
pub hook_id: Option<String>,
#[serde(default)]
pub operator_backend_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapTokenRecord {
pub token: CapToken,
pub uses_left: Option<u32>, pub revoked: bool,
#[serde(default)]
pub task_id: Option<TaskId>,
}
impl CapTokenRecord {
pub fn from_token(token: CapToken) -> Self {
Self {
uses_left: token.max_uses,
token,
revoked: false,
task_id: None,
}
}
pub fn from_worker_token(token: CapToken, task_id: TaskId) -> Self {
Self {
uses_left: token.max_uses,
token,
revoked: false,
task_id: Some(task_id),
}
}
pub fn consume(&mut self) -> Result<(), CapTokenConsumeError> {
if self.revoked {
return Err(CapTokenConsumeError::Revoked);
}
match self.uses_left.as_mut() {
None => Ok(()),
Some(0) => Err(CapTokenConsumeError::Exhausted),
Some(n) => {
*n -= 1;
Ok(())
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum CapTokenConsumeError {
#[error("token revoked")]
Revoked,
#[error("token uses exhausted")]
Exhausted,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Event {
SessionAttached {
session_id: SessionId,
role: Role,
},
SessionDetached {
session_id: SessionId,
},
TaskCreated {
task_id: TaskId,
},
TaskAttemptStarted {
task_id: TaskId,
attempt: u32,
},
TaskAttemptCompleted {
task_id: TaskId,
attempt: u32,
result: Value,
},
TaskPass {
task_id: TaskId,
result: Value,
},
TaskBlocked {
task_id: TaskId,
result: Value,
},
WorkerOutput {
task_id: TaskId,
attempt: u32,
event: crate::worker::output::OutputEvent,
},
TaskSuspended {
task_id: TaskId,
key: ResumeKey,
},
TaskResumed {
task_id: TaskId,
key: ResumeKey,
},
TaskCancelled {
task_id: TaskId,
},
SeniorQueried {
task_id: TaskId,
question: Value,
},
SeniorAnswered {
task_id: TaskId,
answer: Value,
},
}
pub type EventStream = broadcast::Receiver<Event>;
#[derive(Debug, Clone)]
pub struct ResumePending {
pub notify: Arc<Notify>,
pub answer: Option<Value>,
}
impl ResumePending {
pub fn new() -> Self {
Self {
notify: Arc::new(Notify::new()),
answer: None,
}
}
}
impl Default for ResumePending {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct EngineState {
pub tasks: HashMap<TaskId, TaskState>,
pub sessions: HashMap<SessionId, OperatorSession>,
pub prompts: HashMap<(TaskId, u32), String>,
pub systems: HashMap<(TaskId, u32), Option<String>>,
pub tokens: HashMap<String, CapTokenRecord>, pub worker_handles: HashMap<String, String>,
pub pending_resumes: HashMap<ResumeKey, ResumePending>,
pub task_notifies: HashMap<TaskId, Arc<Notify>>,
pub resources: HashMap<String, Value>,
pub output_store: HashMap<(TaskId, u32), Vec<crate::worker::output::OutputEvent>>,
pub event_log_tail: Vec<Event>,
pub event_log_max: usize,
}
impl EngineState {
pub fn new() -> Self {
Self {
tasks: HashMap::new(),
sessions: HashMap::new(),
prompts: HashMap::new(),
systems: HashMap::new(),
tokens: HashMap::new(),
worker_handles: HashMap::new(),
pending_resumes: HashMap::new(),
task_notifies: HashMap::new(),
resources: HashMap::new(),
output_store: HashMap::new(),
event_log_tail: Vec::new(),
event_log_max: 1024,
}
}
pub fn ensure_task_notify(&mut self, task_id: &TaskId) -> Arc<Notify> {
self.task_notifies
.entry(task_id.clone())
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub fn push_event(&mut self, ev: Event) {
self.event_log_tail.push(ev);
if self.event_log_tail.len() > self.event_log_max {
let overflow = self.event_log_tail.len() - self.event_log_max;
self.event_log_tail.drain(..overflow);
}
}
}
impl Default for EngineState {
fn default() -> Self {
Self::new()
}
}