atomr_agents_channel_core/
thread.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use atomr_agents_callable::Callable;
5use atomr_agents_core::{CallCtx, Message, MessageRole, Value};
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8
9use crate::content::{InboundMessage, MessageContent};
10use crate::ids::{ChannelId, PeerId, ThreadId};
11use crate::target::ThreadTarget;
12
13#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
15pub struct ThreadPolicy {
16 #[serde(default = "ThreadPolicy::default_history_cap")]
19 pub history_cap: usize,
20 #[serde(default)]
23 pub auto_open: bool,
24}
25
26impl ThreadPolicy {
27 fn default_history_cap() -> usize {
28 200
29 }
30}
31
32impl Default for ThreadPolicy {
33 fn default() -> Self {
34 Self {
35 history_cap: Self::default_history_cap(),
36 auto_open: false,
37 }
38 }
39}
40
41#[derive(Clone)]
46pub struct Thread {
47 pub id: ThreadId,
48 pub channel: ChannelId,
49 pub peer: PeerId,
50 pub target: ThreadTarget,
51 pub history: Vec<Message>,
52 pub policy: ThreadPolicy,
53}
54
55impl Thread {
56 pub fn new(channel: ChannelId, peer: PeerId, target: ThreadTarget) -> Self {
57 let id = ThreadId::for_peer(&channel, &peer);
58 Self {
59 id,
60 channel,
61 peer,
62 target,
63 history: Vec::new(),
64 policy: ThreadPolicy::default(),
65 }
66 }
67
68 pub fn push_user(&mut self, content: &MessageContent) {
69 self.push(MessageRole::User, content.as_text());
70 }
71
72 pub fn push_assistant(&mut self, content: &MessageContent) {
73 self.push(MessageRole::Assistant, content.as_text());
74 }
75
76 pub fn push(&mut self, role: MessageRole, text: String) {
77 self.history.push(Message { role, content: text });
78 if self.policy.history_cap > 0 && self.history.len() > self.policy.history_cap {
79 let excess = self.history.len() - self.policy.history_cap;
80 self.history.drain(0..excess);
81 }
82 }
83}
84
85#[derive(Clone)]
88pub struct ThreadRef {
89 inner: Arc<RwLock<Thread>>,
90}
91
92impl ThreadRef {
93 pub fn from_arc(inner: Arc<RwLock<Thread>>) -> Self {
94 Self { inner }
95 }
96
97 pub fn new(thread: Thread) -> Self {
98 Self {
99 inner: Arc::new(RwLock::new(thread)),
100 }
101 }
102
103 pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Thread> {
104 self.inner.read()
105 }
106
107 pub fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Thread> {
108 self.inner.write()
109 }
110
111 pub fn id(&self) -> ThreadId {
112 self.inner.read().id.clone()
113 }
114
115 pub fn snapshot(&self) -> Thread {
116 self.inner.read().clone()
117 }
118
119 pub fn synthetic_inbound(&self, text: String) -> InboundMessage {
123 let t = self.inner.read();
124 InboundMessage {
125 channel_id: t.channel.clone(),
126 thread_id: t.id.clone(),
127 peer: t.peer.clone(),
128 provider_msg_id: format!("synthetic-{}", uuid::Uuid::new_v4()),
129 content: MessageContent::text(text),
130 received_at: chrono::Utc::now(),
131 raw: serde_json::Value::Null,
132 }
133 }
134}
135
136#[async_trait]
144impl Callable for ThreadRef {
145 async fn call(&self, input: Value, ctx: CallCtx) -> atomr_agents_core::Result<Value> {
146 let text = extract_user_text(&input);
147 let (target, channel, peer, thread_id, history) = {
148 let g = self.inner.read();
149 (
150 g.target.clone(),
151 g.channel.clone(),
152 g.peer.clone(),
153 g.id.clone(),
154 g.history.clone(),
155 )
156 };
157 let envelope = serde_json::json!({
158 "user": text,
159 "thread": { "id": thread_id.as_str(), "history_len": history.len() },
160 "channel": { "id": channel.as_str(), "peer": peer.as_str() },
161 });
162 match target {
163 ThreadTarget::Callable(handle) => handle.call(envelope, ctx).await,
164 ThreadTarget::Harness { callable, .. } => callable.call(envelope, ctx).await,
165 }
166 }
167
168 fn label(&self) -> &str {
169 "thread"
170 }
171}
172
173fn extract_user_text(input: &Value) -> String {
174 match input {
175 Value::String(s) => s.clone(),
176 Value::Object(m) => m
177 .get("user")
178 .and_then(|v| v.as_str())
179 .unwrap_or_default()
180 .to_string(),
181 other => other.to_string(),
182 }
183}