solti-core
Orchestration layer for the solti task system.
Bridges solti-model (public API types) with the taskvisor runtime.
Provides SupervisorApi - the main entry point for submitting, querying, and cancelling tasks.
Architecture
SupervisorApi
submit(spec)
├──► spec.validate()
├──► RunnerRouter::build(spec) → TaskRef
└──► submit_with_task(task, spec)
├──► state.add_task(id, spec)
├──► map policies → ControllerSpec
└──► handle.submit(controller_spec
taskvisor events ──► StateSubscriber ──► TaskState
└─► OutputRegistry
(RunStarted / RunFinished / evict)
query_tasks(q) ──► TaskState ──► TaskPage<Task>
get_task(id) ──► TaskState ──► Option<Task>
list_task_runs ──► TaskState ──► Vec<TaskRun>
output_registry() ──► Arc<OutputRegistry> (live-tail subs)
new(..., state_cfg) ──► auto-starts sweep task
└──► submit_with_task(state_sweep(state, state_cfg))
Event flow
taskvisor runtime
│
├──► TaskAdded → (traced only; task is already in state from submit)
├──► TaskStarting → transition_starting + announce_run_started
├──► TaskStopped → transition_finished(Succeeded) + announce_run_finished
├──► TaskFailed → transition_finished(Failed) + announce_run_finished
├──► TimeoutHit → transition_finished(Timeout) + announce_run_finished
├──► ActorExhausted → transition_finished(Exhausted) + announce_run_finished + evict
├──► ActorDead → transition_finished(Failed) + announce_run_finished + evict
└──► TaskRemoved → unregister_task + evict
announce_* and evict reach an Arc<OutputRegistry> shared with the runner side;
this is what bridges supervisor lifecycle into the live-tail broadcast channel that subscribers (HTTP SSE, gRPC stream) read from.
Key types
| Type | Visibility | Description |
|---|---|---|
SupervisorApi |
pub | High-level facade: submit, query, cancel, sweep, output_registry() accessor |
StateConfig |
pub | TTL settings for runs, tasks, and sweep interval |
CoreError |
pub | Error enum: Supervisor, Mapping, Runner, InvalidSpec |
uptime_seconds() |
pub | Agent uptime helper (OnceLock<Instant>) |
TaskState |
internal | In-memory storage (Arc<RwLock>); wired by SupervisorApi::new |
StateSubscriber |
internal | Subscribe impl; auto-registered by SupervisorApi::new |
state_sweep() |
internal | Embedded periodic sweeper task; auto-submitted by SupervisorApi::new |
State storage
TaskState (Arc<RwLock<TaskStateInner>>)
┌──────────────────────────────────────────────┐
│ tasks: HashMap<TaskId, Task> │
│ by_slot: HashMap<Slot, Vec<TaskId>> │ ← index for slot queries
│ runs: HashMap<TaskId, VecDeque<TaskRun>> │
└──────────────────────────────────────────────┘
Queries use the by_slot index when a slot filter is present to avoid full scans.
Pagination is deterministic (sorted by TaskId).
State sweep
SupervisorApi::new(..., StateConfig)
└──► auto-starts embedded periodic task (slot: "solti-state-sweep")
├──► pass 1: remove finished runs older than run_ttl
└──► pass 2: remove terminal tasks with no runs past task_ttl
| Parameter | Default | Controls |
|---|---|---|
run_ttl |
1 hour | How long finished runs are retained |
task_ttl |
1 hour | How long terminal tasks are retained |
sweep_interval |
5 minutes | Sweep frequency (via RestartPolicy::periodic) |
Sweep is always-on. Configure TTLs via StateConfig if defaults don't fit.
Policy mapping
solti-model taskvisor
─────────── ────────
AdmissionPolicy::Replace → AdmissionPolicy::Replace
RestartPolicy::OnFailure → RestartPolicy::OnFailure
JitterPolicy::Equal → JitterPolicy::Equal
BackoffPolicy { first_ms } → BackoffPolicy { first: Duration }
Model enums are #[non_exhaustive] - unknown variants fall back to safe defaults
(DropIfRunning, Never, Full).
Error model
Variant Source When
─────── ────── ────
Supervisor taskvisor runtime submit/cancel failure
Mapping policy conversion unknown policy variant
Runner solti_runner::RunnerError build_task failure
InvalidSpec solti_model::ModelError spec validation failure
Notes
SupervisorApi::newauto-registersStateSubscriberinto the subscriber list.SupervisorApi::newcreates a fresh emptyOutputRegistry; usenew_with_output_registry(...)to share one with the runner side.TaskStateisCloneviaArc— safe to share across threads.parking_lot::RwLockis used instead ofstd::sync::RwLock(no poisoning, better perf).unregister_task(event-driven onTaskRemoved) drops the task entry but keeps runs around until sweep runs;delete_task(API-driven) drops both task and runs immediately.uptime_seconds()tracks agent lifetime viaOnceLock<Instant>; initialized bySupervisorApi::new.- The sweep task is self-hosted: it runs as an embedded
TaskKind::Embeddedtask inside the same supervisor it manages.