1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
use super::builder::AgentService;
use super::types::*;
use crate::brain::agent::error::{AgentError, Result};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
impl AgentService {
/// Send a message and get a response
///
/// This will:
/// 1. Load conversation context from the database
/// 2. Add the new user message
/// 3. Send to the LLM provider
/// 4. Save the response to the database
/// 5. Update token usage
pub async fn send_message(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
) -> Result<AgentResponse> {
// Prepare message context (common setup logic)
let (_model_name, request, message_service, session_service) = self
.prepare_message_context(session_id, user_message, model)
.await?;
// Send to provider — use session's provider so a concurrent
// foreground swap on another pane can't hijack this turn.
let provider = self.provider_for_session(session_id);
let response = provider
.complete(request)
.await
.map_err(AgentError::Provider)?;
// Extract text from response
let assistant_text = Self::extract_text_from_response(&response);
// Save assistant response to database
let assistant_db_msg = message_service
.create_message(session_id, "assistant".to_string(), assistant_text.clone())
.await
.map_err(|e| AgentError::Database(e.to_string()))?;
// Calculate total tokens and cost for this message
let billable_input = response.usage.input_tokens
+ response.usage.cache_creation_tokens
+ response.usage.cache_read_tokens;
let total_tokens = billable_input + response.usage.output_tokens;
let cost = self
.provider_for_session(session_id)
.calculate_cost_with_cache(
&response.model,
response.usage.input_tokens,
response.usage.output_tokens,
response.usage.cache_creation_tokens,
response.usage.cache_read_tokens,
);
// Update message with usage info, stashing the server-reported
// prompt token count so session reload reads it directly.
message_service
.update_message_usage(
assistant_db_msg.id,
total_tokens as i32,
cost,
Some(billable_input as i32),
)
.await
.map_err(|e| AgentError::Database(e.to_string()))?;
// Update session token usage
session_service
.update_session_usage(session_id, total_tokens as i32, cost)
.await
.map_err(|e| AgentError::Database(e.to_string()))?;
Ok(AgentResponse {
message_id: assistant_db_msg.id,
content: assistant_text,
stop_reason: response.stop_reason,
context_tokens: response.usage.input_tokens,
usage: response.usage,
cost,
model: response.model,
provider_name: self.provider_name_for_session(session_id),
})
}
/// Send a message and get a streaming response
///
/// Returns a stream of response chunks that can be consumed incrementally.
pub async fn send_message_streaming(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
) -> Result<AgentStreamResponse> {
// Prepare message context (common setup logic)
let (model_name, request, _message_service, _session_service) = self
.prepare_message_context(session_id, user_message, model)
.await?;
// Add streaming flag to request
let request = request.with_streaming();
// Get streaming response from provider (session-scoped so a
// concurrent /models pick on a different pane can't hijack).
let provider = self.provider_for_session(session_id);
let stream = provider
.stream(request)
.await
.map_err(AgentError::Provider)?;
Ok(AgentStreamResponse {
session_id,
message_id: Uuid::new_v4(),
stream,
model: model_name,
})
}
/// Send a message with automatic tool execution (TUI channel).
pub async fn send_message_with_tools(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
) -> Result<AgentResponse> {
self.send_message_with_tools_and_mode(session_id, user_message, model, None)
.await
}
/// Shim: send with tools + optional cancellation token (TUI channel).
/// Delegates to `run_tool_loop` with service-level callbacks.
pub async fn send_message_with_tools_and_mode(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
cancel_token: Option<CancellationToken>,
) -> Result<AgentResponse> {
self.run_tool_loop(
session_id,
user_message,
None,
model,
cancel_token,
None,
None,
"tui",
None,
)
.await
}
/// Send a message with per-call callback overrides and channel routing.
#[allow(clippy::too_many_arguments)]
///
/// `override_approval_callback` and `override_progress_callback` take
/// precedence over the service-level callbacks (used by Telegram, Discord, etc.).
/// Pass `None` to fall back to the service-level callback.
///
/// `channel` and `channel_chat_id` identify the originating channel for
/// crash recovery routing.
pub async fn send_message_with_tools_and_callback(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
cancel_token: Option<CancellationToken>,
override_approval_callback: Option<ApprovalCallback>,
override_progress_callback: Option<ProgressCallback>,
channel: &str,
channel_chat_id: Option<&str>,
) -> Result<AgentResponse> {
self.run_tool_loop(
session_id,
user_message,
None,
model,
cancel_token,
override_approval_callback,
override_progress_callback,
channel,
channel_chat_id,
)
.await
}
/// Send a message and provide a separate human-readable `display_text`
/// for DB persistence and TUI/session display. The full `user_message`
/// (typically the channel-wrapped agent input with sender metadata,
/// reply context, group history, channel hints) still goes to the LLM
/// context so the agent retains all the information it needs, but the
/// chat history shown in the TUI mirrors what the user actually typed
/// in the channel.
///
/// Channels (Telegram, Discord, Slack, WhatsApp, Trello) pass the raw
/// user text — or a lightly prefixed `Sender: text` for groups/non-DM
/// scenarios — as `display_text` so opening the session in OpenCrabs
/// no longer surfaces the LLM-only metadata brackets.
#[allow(clippy::too_many_arguments)]
pub async fn send_message_with_tools_and_display(
&self,
session_id: Uuid,
user_message: String,
display_text: Option<String>,
model: Option<String>,
cancel_token: Option<CancellationToken>,
override_approval_callback: Option<ApprovalCallback>,
override_progress_callback: Option<ProgressCallback>,
channel: &str,
channel_chat_id: Option<&str>,
) -> Result<AgentResponse> {
self.run_tool_loop(
session_id,
user_message,
display_text,
model,
cancel_token,
override_approval_callback,
override_progress_callback,
channel,
channel_chat_id,
)
.await
}
}