tirea-agentos 0.5.0

Agent runtime with streaming LLM integration, sub-agent orchestration, and context window management
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
use super::errors::{AgentOsResolveError, AgentOsRunError};
use super::launch::RunLaunchSpec;
use super::prepare::{
    clear_tool_call_scope_state, request_has_user_input, run_lifecycle_running_patch,
    run_scope_cleanup_patches, set_or_validate_parent_thread_id, ActiveRunCleanupGuard,
};
use super::types::{AgentOs, AgentStateStoreStateCommitter, PreparedRun, RunStream};
use super::ResolvedRun;

use crate::composition::AgentOsWiringError;
use crate::contracts::runtime::RunIdentity;
use crate::contracts::storage::{ThreadHead, ThreadStore, VersionPrecondition};
use crate::contracts::thread::{CheckpointReason, Message, Thread};
use crate::contracts::{RunContext, RunRequest};
use crate::runtime::loop_runner::{
    run_loop_stream_with_context, AgentLoopError, RunCancellationToken,
};
use futures::StreamExt;
use std::sync::Arc;

impl AgentOs {
    pub fn agent_state_store(&self) -> Option<&Arc<dyn ThreadStore>> {
        self.agent_state_store.as_ref()
    }

    fn require_agent_state_store(&self) -> Result<&Arc<dyn ThreadStore>, AgentOsRunError> {
        self.agent_state_store
            .as_ref()
            .ok_or(AgentOsRunError::AgentStateStoreNotConfigured)
    }

    fn generate_id() -> String {
        uuid::Uuid::now_v7().simple().to_string()
    }

    /// Load a thread from storage. Returns the thread and its version.
    /// If the thread does not exist, returns `None`.
    pub async fn load_thread(&self, id: &str) -> Result<Option<ThreadHead>, AgentOsRunError> {
        let agent_state_store = self.require_agent_state_store()?;
        Ok(agent_state_store.load(id).await?)
    }

    pub async fn current_run_id_for_thread(
        &self,
        agent_id: &str,
        thread_id: &str,
    ) -> Result<Option<String>, AgentOsRunError> {
        if let Some(run_id) = self.active_run_id_for_thread(agent_id, thread_id).await {
            return Ok(Some(run_id));
        }
        let store = self.require_agent_state_store()?;
        let Some(record) = store.active_run_for_thread(thread_id).await? else {
            return Ok(None);
        };
        if !record.agent_id.is_empty() && record.agent_id != agent_id {
            return Ok(None);
        }
        Ok(Some(record.run_id))
    }

    async fn clear_suspended_calls_before_user_run_input(
        &self,
        run_request: &mut RunRequest,
    ) -> Result<(), AgentOsRunError> {
        let Some(thread_id) = run_request.thread_id.as_deref() else {
            return Ok(());
        };
        if !request_has_user_input(&run_request.messages) {
            return Ok(());
        }

        let store = self.require_agent_state_store()?;
        let Some(head) = store.load(thread_id).await? else {
            return Ok(());
        };
        if let Some(cleaned) = clear_tool_call_scope_state(&head.thread.state) {
            run_request.state = Some(cleaned);
        }
        Ok(())
    }

    pub(crate) async fn prepare_active_run_with_spec(
        &self,
        owner_agent_id: &str,
        run_request: RunRequest,
        resolved: ResolvedRun,
        launch: RunLaunchSpec,
    ) -> Result<(PreparedRun, String, String), AgentOsRunError> {
        let previous_run_id = if !run_request.messages.is_empty() {
            if let Some(thread_id) = run_request.thread_id.as_deref() {
                self.current_run_id_for_thread(owner_agent_id, thread_id)
                    .await?
            } else {
                None
            }
        } else {
            None
        };

        let prepared = self
            .prepare_run_with_spec(run_request, resolved, launch)
            .await?;
        let thread_id = prepared.thread_id().to_string();
        let run_id = prepared.run_id().to_string();

        if let Some(previous_run_id) = previous_run_id.filter(|candidate| candidate != &run_id) {
            self.cancel_active_run_by_id(&previous_run_id).await;
        }

        self.register_thread_run_handle(
            run_id.clone(),
            owner_agent_id,
            &thread_id,
            RunCancellationToken::new(),
        )
        .await;

        Ok((prepared, thread_id, run_id))
    }

    pub(crate) async fn start_prepared_active_run(
        &self,
        run_id: &str,
        prepared: PreparedRun,
    ) -> Result<RunStream, AgentOsRunError> {
        let token = self
            .active_thread_run_by_run_id(run_id)
            .await
            .ok_or_else(|| {
                AgentOsRunError::Loop(AgentLoopError::StateError(format!(
                    "active run handle missing for run '{run_id}'",
                )))
            })?
            .cancellation_token();
        let run = Self::execute_prepared(prepared.with_cancellation_token(token))?;
        if !self
            .bind_thread_run_decision_tx(run_id, run.decision_tx.clone())
            .await
        {
            self.remove_thread_run_handle(run_id).await;
            return Err(AgentOsRunError::Loop(AgentLoopError::StateError(format!(
                "active run handle missing for run '{run_id}'",
            ))));
        }
        Ok(self.wrap_run_stream_with_active_handle_cleanup(run))
    }

    pub async fn start_active_run_with_spec(
        &self,
        owner_agent_id: &str,
        run_request: RunRequest,
        resolved: ResolvedRun,
        launch: RunLaunchSpec,
    ) -> Result<RunStream, AgentOsRunError> {
        let (prepared, _thread_id, run_id) = self
            .prepare_active_run_with_spec(owner_agent_id, run_request, resolved, launch)
            .await?;
        self.start_prepared_active_run(&run_id, prepared).await
    }

    fn wrap_run_stream_with_active_handle_cleanup(&self, run: RunStream) -> RunStream {
        let RunStream {
            thread_id,
            run_id,
            decision_tx,
            events,
        } = run;
        let run_id_for_cleanup = run_id.clone();
        let registry = self.active_runs.clone();
        let events = Box::pin(futures::stream::unfold(
            (
                events,
                Some(ActiveRunCleanupGuard::new(run_id_for_cleanup, registry)),
            ),
            |(mut inner, mut cleanup)| async move {
                match inner.next().await {
                    Some(event) => Some((event, (inner, cleanup))),
                    None => {
                        if let Some(mut cleanup) = cleanup.take() {
                            cleanup.cleanup_now().await;
                        }
                        None
                    }
                }
            },
        ));
        RunStream {
            thread_id,
            run_id,
            decision_tx,
            events,
        }
    }

    /// Prepare a resolved run for execution.
    ///
    /// This handles all deterministic pre-run logic:
    /// 1. Thread loading/creation from storage
    /// 2. Message deduplication and appending
    /// 3. Persisting pre-run state
    /// 4. Run-context creation
    ///
    /// Callers resolve first, optionally customize, then prepare:
    /// ```ignore
    /// let mut resolved = os.resolve("my-agent")?;
    /// resolved.tools.insert("extra".into(), tool);
    /// let prepared = os.prepare_run(request, resolved).await?;
    /// ```
    pub async fn prepare_run(
        &self,
        request: RunRequest,
        resolved: ResolvedRun,
    ) -> Result<PreparedRun, AgentOsRunError> {
        self.prepare_run_with_spec(request, resolved, RunLaunchSpec::DURABLE)
            .await
    }

    /// Prepare a resolved run using an explicit launch policy.
    ///
    /// This powers both durable runs and thread-only/dialog-style runs while
    /// keeping launch semantics in a named value object instead of boolean
    /// parameters.
    pub async fn prepare_run_with_spec(
        &self,
        mut request: RunRequest,
        resolved: ResolvedRun,
        launch: RunLaunchSpec,
    ) -> Result<PreparedRun, AgentOsRunError> {
        if launch.strip_lineage() {
            request.run_id = None;
            request.parent_run_id = None;
            request.parent_thread_id = None;
        }
        self.clear_suspended_calls_before_user_run_input(&mut request)
            .await?;

        self.prepare_run_with_persistence_flag(request, resolved, launch.persist_run_mapping())
            .await
    }

    async fn prepare_run_with_persistence_flag(
        &self,
        mut request: RunRequest,
        resolved: ResolvedRun,
        persist_run: bool,
    ) -> Result<PreparedRun, AgentOsRunError> {
        let agent_state_store = self.require_agent_state_store()?;

        let thread_id = request.thread_id.unwrap_or_else(Self::generate_id);
        let run_id = request.run_id.unwrap_or_else(Self::generate_id);
        let parent_run_id = request.parent_run_id.clone();
        let parent_thread_id = request.parent_thread_id.clone();
        let initial_decisions = std::mem::take(&mut request.initial_decisions);

        // 1. Load or create thread
        //    If frontend sent a state snapshot, apply it:
        //    - New thread: used as initial state
        //    - Existing thread: replaces current state (persisted in UserMessage delta)
        let frontend_state = request.state.take();
        let mut state_snapshot_for_delta: Option<serde_json::Value> = None;
        let (mut thread, mut version) = match agent_state_store.load(&thread_id).await? {
            Some(head) => {
                let mut t = head.thread;
                if let Some(state) = frontend_state {
                    t.state = state.clone();
                    t.patches.clear();
                    state_snapshot_for_delta = Some(state);
                }
                (t, head.version)
            }
            None => {
                let thread = if let Some(state) = frontend_state {
                    Thread::with_initial_state(thread_id.clone(), state)
                } else {
                    Thread::new(thread_id.clone())
                };
                let committed = agent_state_store.create(&thread).await?;
                (thread, committed.version)
            }
        };
        let parent_thread_id_updated =
            set_or_validate_parent_thread_id(&mut thread, &thread_id, parent_thread_id.as_deref())?;
        if parent_thread_id_updated {
            agent_state_store.save(&thread).await?;
            let refreshed = agent_state_store.load(&thread_id).await?.ok_or_else(|| {
                AgentOsRunError::Loop(AgentLoopError::StateError(format!(
                    "thread '{thread_id}' disappeared after parent_thread_id update",
                )))
            })?;
            thread = refreshed.thread;
            version = refreshed.version;
        }

        // 1a. Lazy context loading: trim pre-boundary messages.
        crate::runtime::context::trim_thread_to_latest_boundary(&mut thread);

        // 2. Set resource_id on thread if provided
        if let Some(ref resource_id) = request.resource_id {
            thread.resource_id = Some(resource_id.clone());
        }

        // 3. Deduplicate and append inbound messages
        let mut deduped_messages = Self::dedup_messages(&thread, request.messages);
        if !deduped_messages.is_empty() {
            deduped_messages = Self::attach_run_metadata_to_messages(deduped_messages, &run_id);
            thread = thread.with_messages(deduped_messages.clone());
        }

        // 4. Persist run-start changes (user messages + frontend state snapshot + run state)
        let delta_messages: Vec<Arc<Message>> =
            deduped_messages.into_iter().map(Arc::new).collect();
        // 4a. Clean up stale Run-scoped state from any previous run.
        let mut delta_patches =
            run_scope_cleanup_patches(&thread.state, &resolved.agent.state_scope_registry);
        // 4b. Apply cleanup patches to in-memory thread state so the lifecycle
        //     patch reducer sees a clean base.
        for cp in &delta_patches {
            thread.state =
                tirea_state::apply_patch(&thread.state, cp.patch()).map_err(|error| {
                    AgentOsRunError::Loop(AgentLoopError::StateError(format!(
                        "failed to apply run-scope cleanup patch for thread '{thread_id}': {error}"
                    )))
                })?;
        }
        delta_patches.push(run_lifecycle_running_patch(&thread.state, &run_id)?);
        let mut changeset = crate::contracts::ThreadChangeSet::from_parts(
            run_id.clone(),
            parent_run_id.clone(),
            CheckpointReason::UserMessage,
            delta_messages,
            delta_patches.clone(),
            Vec::new(),
            state_snapshot_for_delta,
        );
        if persist_run {
            changeset = changeset.with_run_meta(crate::contracts::RunMeta {
                agent_id: request.agent_id.clone(),
                origin: request.origin,
                status: crate::contracts::storage::RunStatus::Running,
                parent_thread_id: parent_thread_id.clone(),
                termination_code: None,
                termination_detail: None,
                source_mailbox_entry_id: request.source_mailbox_entry_id.clone(),
                input_tokens: 0,
                output_tokens: 0,
            });
        }
        let committed = agent_state_store
            .append(&thread_id, &changeset, VersionPrecondition::Exact(version))
            .await?;
        version = committed.version;
        thread = thread.with_patches(delta_patches);
        thread.metadata.version = Some(version);

        let mut run_identity = RunIdentity::new(
            thread_id.clone(),
            parent_thread_id.clone(),
            run_id.clone(),
            parent_run_id.clone(),
            request.agent_id.clone(),
            request.origin,
        );
        if let Some(parent_tool_call_id) = resolved.parent_tool_call_id.clone() {
            run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
        }

        // 6. Behavior uniqueness: wiring ensures base uniqueness, but callers
        //    may mutate `resolved.agent.behavior` after resolve.
        //    Validate the final composed behavior_ids for duplicates.
        {
            let ids = resolved.agent.behavior.behavior_ids();
            let mut seen = std::collections::HashSet::with_capacity(ids.len());
            for id in &ids {
                if !seen.insert(*id) {
                    return Err(AgentOsRunError::Resolve(AgentOsResolveError::Wiring(
                        AgentOsWiringError::BehaviorAlreadyInstalled(id.to_string()),
                    )));
                }
            }
        }

        let run_ctx = RunContext::from_thread_with_registry_and_identity(
            &thread,
            resolved.run_policy,
            run_identity.clone(),
            resolved.agent.lattice_registry.clone(),
        )
        .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
        let (decision_tx, decision_rx) = tokio::sync::mpsc::unbounded_channel();
        for decision in initial_decisions {
            decision_tx
                .send(decision)
                .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
        }

        Ok(PreparedRun {
            agent: Arc::new(resolved.agent),
            tools: resolved.tools,
            run_ctx,
            cancellation_token: None,
            state_committer: Some(Arc::new(AgentStateStoreStateCommitter::new(
                agent_state_store.clone(),
                persist_run,
            ))),
            decision_tx,
            decision_rx,
        })
    }

    /// Execute a previously prepared run.
    pub fn execute_prepared(prepared: PreparedRun) -> Result<RunStream, AgentOsRunError> {
        let thread_id = prepared.thread_id().to_string();
        let run_id = prepared.run_id().to_string();
        let run_identity = prepared.run_ctx.run_identity().clone();
        let events = run_loop_stream_with_context(
            prepared.agent,
            prepared.tools,
            prepared.run_ctx,
            run_identity,
            prepared.cancellation_token,
            prepared.state_committer,
            Some(prepared.decision_rx),
        );
        Ok(RunStream {
            thread_id,
            run_id,
            decision_tx: prepared.decision_tx,
            events,
        })
    }

    /// Resolve, prepare, and execute an agent run.
    ///
    /// This is the primary entry point. Callers that need to customize
    /// the resolved wiring should use `resolve` + mutation + `prepare_run`
    /// + `execute_prepared` instead.
    pub async fn run_stream(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
        let resolved = self.resolve(&request.agent_id)?;
        let prepared = self.prepare_run(request, resolved).await?;
        Self::execute_prepared(prepared)
    }

    /// Resolve, prepare, and execute an agent run.
    ///
    /// When multiple agents are registered, `HandoffPlugin` handles dynamic
    /// agent switching within the run via `agent_handoff`. No termination or
    /// re-resolution occurs — this method simply delegates to `run_stream`.
    #[cfg(feature = "handoff")]
    pub async fn run(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
        self.run_stream(request).await
    }

    /// Deduplicate incoming messages against existing thread messages.
    ///
    /// Skips messages whose ID or tool_call_id already exists in the thread.
    fn dedup_messages(thread: &Thread, incoming: Vec<Message>) -> Vec<Message> {
        use std::collections::HashSet;

        let existing_ids: HashSet<&str> = thread
            .messages
            .iter()
            .filter_map(|m| m.id.as_deref())
            .collect();
        let existing_tool_call_ids: HashSet<&str> = thread
            .messages
            .iter()
            .filter_map(|m| m.tool_call_id.as_deref())
            .collect();

        incoming
            .into_iter()
            .filter(|m| {
                // Dedup tool messages by tool_call_id
                if let Some(ref tc_id) = m.tool_call_id {
                    if existing_tool_call_ids.contains(tc_id.as_str()) {
                        return false;
                    }
                }
                // Dedup by message id
                if let Some(ref id) = m.id {
                    if existing_ids.contains(id.as_str()) {
                        return false;
                    }
                }
                true
            })
            .collect()
    }

    fn attach_run_metadata_to_messages(mut messages: Vec<Message>, run_id: &str) -> Vec<Message> {
        messages.iter_mut().for_each(|message| {
            let mut metadata = message.metadata.clone().unwrap_or_default();
            metadata.run_id = Some(run_id.to_string());
            message.metadata = Some(metadata);
        });
        messages
    }
}