Skip to main content

atomr_agents_channel_core/
thread.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use atomr_agents_callable::Callable;
5use atomr_agents_core::{CallCtx, Message, MessageRole, Value};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8
9use crate::content::{InboundMessage, MessageContent};
10use crate::ids::{ChannelId, PeerId, ThreadId};
11use crate::target::ThreadTarget;
12
13/// Lightweight policy controlling thread behavior.
14#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
15pub struct ThreadPolicy {
16    /// Maximum number of `Message`s kept in `history`. Older messages
17    /// are evicted from the front. `0` means unbounded.
18    #[serde(default = "ThreadPolicy::default_history_cap")]
19    pub history_cap: usize,
20    /// If `true`, an unknown peer's first inbound auto-opens a thread
21    /// using the channel's default target.
22    #[serde(default)]
23    pub auto_open: bool,
24}
25
26impl ThreadPolicy {
27    fn default_history_cap() -> usize {
28        200
29    }
30}
31
32impl Default for ThreadPolicy {
33    fn default() -> Self {
34        Self {
35            history_cap: Self::default_history_cap(),
36            auto_open: false,
37        }
38    }
39}
40
41/// One conversation between a peer and a bound target on a channel.
42///
43/// Stored in the [`ChannelStore`](crate::ChannelStore) by id; the
44/// orchestrator holds an `Arc<RwLock<Thread>>` through [`ThreadRef`].
45#[derive(Clone)]
46pub struct Thread {
47    pub id: ThreadId,
48    pub channel: ChannelId,
49    pub peer: PeerId,
50    pub target: ThreadTarget,
51    pub history: Vec<Message>,
52    pub policy: ThreadPolicy,
53}
54
55impl Thread {
56    pub fn new(channel: ChannelId, peer: PeerId, target: ThreadTarget) -> Self {
57        let id = ThreadId::for_peer(&channel, &peer);
58        Self {
59            id,
60            channel,
61            peer,
62            target,
63            history: Vec::new(),
64            policy: ThreadPolicy::default(),
65        }
66    }
67
68    pub fn push_user(&mut self, content: &MessageContent) {
69        self.push(MessageRole::User, content.as_text());
70    }
71
72    pub fn push_assistant(&mut self, content: &MessageContent) {
73        self.push(MessageRole::Assistant, content.as_text());
74    }
75
76    pub fn push(&mut self, role: MessageRole, text: String) {
77        self.history.push(Message { role, content: text });
78        if self.policy.history_cap > 0 && self.history.len() > self.policy.history_cap {
79            let excess = self.history.len() - self.policy.history_cap;
80            self.history.drain(0..excess);
81        }
82    }
83}
84
85/// Public, shared handle on a [`Thread`]. Implements [`Callable`] so
86/// a thread can be embedded as a workflow step or team child.
87#[derive(Clone)]
88pub struct ThreadRef {
89    inner: Arc<RwLock<Thread>>,
90}
91
92impl ThreadRef {
93    pub fn from_arc(inner: Arc<RwLock<Thread>>) -> Self {
94        Self { inner }
95    }
96
97    pub fn new(thread: Thread) -> Self {
98        Self {
99            inner: Arc::new(RwLock::new(thread)),
100        }
101    }
102
103    pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Thread> {
104        self.inner.read()
105    }
106
107    pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Thread> {
108        self.inner.write()
109    }
110
111    pub fn id(&self) -> ThreadId {
112        self.inner.read().id.clone()
113    }
114
115    pub fn snapshot(&self) -> Thread {
116        self.inner.read().clone()
117    }
118
119    /// Synthesize an inbound from a free-text string for ad-hoc callers
120    /// (mainly the `Callable` impl). Provider-driven inbound goes
121    /// through the orchestrator instead.
122    pub fn synthetic_inbound(&self, text: String) -> InboundMessage {
123        let t = self.inner.read();
124        InboundMessage {
125            channel_id: t.channel.clone(),
126            thread_id: t.id.clone(),
127            peer: t.peer.clone(),
128            provider_msg_id: format!("synthetic-{}", uuid::Uuid::new_v4()),
129            content: MessageContent::text(text),
130            received_at: chrono::Utc::now(),
131            raw: serde_json::Value::Null,
132        }
133    }
134}
135
136/// `Callable` adapter — lets a thread be embedded anywhere a
137/// `Callable` is expected.
138///
139/// For `ThreadTarget::Callable` targets, we forward `{"user": text, …}`
140/// envelopes that `AgentRef::call` already understands. For
141/// `ThreadTarget::Harness` targets we call the wrapped callable (which
142/// is `HarnessRef`); see [`crate::ThreadTarget`].
143#[async_trait]
144impl Callable for ThreadRef {
145    async fn call(&self, input: Value, ctx: CallCtx) -> atomr_agents_core::Result<Value> {
146        let text = extract_user_text(&input);
147        let (target, channel, peer, thread_id, history) = {
148            let g = self.inner.read();
149            (
150                g.target.clone(),
151                g.channel.clone(),
152                g.peer.clone(),
153                g.id.clone(),
154                g.history.clone(),
155            )
156        };
157        let envelope = serde_json::json!({
158            "user": text,
159            "thread": { "id": thread_id.as_str(), "history_len": history.len() },
160            "channel": { "id": channel.as_str(), "peer": peer.as_str() },
161        });
162        match target {
163            ThreadTarget::Callable(handle) => handle.call(envelope, ctx).await,
164            ThreadTarget::Harness { callable, .. } => callable.call(envelope, ctx).await,
165        }
166    }
167
168    fn label(&self) -> &str {
169        "thread"
170    }
171}
172
173fn extract_user_text(input: &Value) -> String {
174    match input {
175        Value::String(s) => s.clone(),
176        Value::Object(m) => m
177            .get("user")
178            .and_then(|v| v.as_str())
179            .unwrap_or_default()
180            .to_string(),
181        other => other.to_string(),
182    }
183}