1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use super::builder::AgentService;
use super::types::*;
use crate::brain::agent::error::{AgentError, Result};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
impl AgentService {
/// Send a message and get a response
///
/// This will:
/// 1. Load conversation context from the database
/// 2. Add the new user message
/// 3. Send to the LLM provider
/// 4. Save the response to the database
/// 5. Update token usage
pub async fn send_message(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
) -> Result<AgentResponse> {
// Prepare message context (common setup logic)
let (_model_name, request, message_service, session_service) = self
.prepare_message_context(session_id, user_message, model)
.await?;
// Send to provider — use session's provider so a concurrent
// foreground swap on another pane can't hijack this turn.
let provider = self.provider_for_session(session_id);
let response = provider
.complete(request)
.await
.map_err(AgentError::Provider)?;
// Extract text from response
let assistant_text = Self::extract_text_from_response(&response);
// Save assistant response to database
let assistant_db_msg = message_service
.create_message(session_id, "assistant".to_string(), assistant_text.clone())
.await
.map_err(|e| AgentError::Database(e.to_string()))?;
// Calculate total tokens and cost for this message
let billable_input = response.usage.input_tokens
+ response.usage.cache_creation_tokens
+ response.usage.cache_read_tokens;
let total_tokens = billable_input + response.usage.output_tokens;
let cost = self
.provider_for_session(session_id)
.calculate_cost_with_cache(
&response.model,
response.usage.input_tokens,
response.usage.output_tokens,
response.usage.cache_creation_tokens,
response.usage.cache_read_tokens,
);
// Update message with usage info, stashing the server-reported
// prompt token count so session reload reads it directly.
message_service
.update_message_usage(
assistant_db_msg.id,
total_tokens as i32,
cost,
Some(billable_input as i32),
)
.await
.map_err(|e| AgentError::Database(e.to_string()))?;
// Update session token usage
session_service
.update_session_usage(session_id, total_tokens as i32, cost)
.await
.map_err(|e| AgentError::Database(e.to_string()))?;
Ok(AgentResponse {
message_id: assistant_db_msg.id,
content: assistant_text,
stop_reason: response.stop_reason,
context_tokens: response.usage.input_tokens,
usage: response.usage,
cost,
model: response.model,
})
}
/// Send a message and get a streaming response
///
/// Returns a stream of response chunks that can be consumed incrementally.
pub async fn send_message_streaming(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
) -> Result<AgentStreamResponse> {
// Prepare message context (common setup logic)
let (model_name, request, _message_service, _session_service) = self
.prepare_message_context(session_id, user_message, model)
.await?;
// Add streaming flag to request
let request = request.with_streaming();
// Get streaming response from provider (session-scoped so a
// concurrent /models pick on a different pane can't hijack).
let provider = self.provider_for_session(session_id);
let stream = provider
.stream(request)
.await
.map_err(AgentError::Provider)?;
Ok(AgentStreamResponse {
session_id,
message_id: Uuid::new_v4(),
stream,
model: model_name,
})
}
/// Send a message with automatic tool execution (TUI channel).
pub async fn send_message_with_tools(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
) -> Result<AgentResponse> {
self.send_message_with_tools_and_mode(session_id, user_message, model, None)
.await
}
/// Shim: send with tools + optional cancellation token (TUI channel).
/// Delegates to `run_tool_loop` with service-level callbacks.
pub async fn send_message_with_tools_and_mode(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
cancel_token: Option<CancellationToken>,
) -> Result<AgentResponse> {
self.run_tool_loop(
session_id,
user_message,
model,
cancel_token,
None,
None,
"tui",
None,
)
.await
}
/// Send a message with per-call callback overrides and channel routing.
#[allow(clippy::too_many_arguments)]
///
/// `override_approval_callback` and `override_progress_callback` take
/// precedence over the service-level callbacks (used by Telegram, Discord, etc.).
/// Pass `None` to fall back to the service-level callback.
///
/// `channel` and `channel_chat_id` identify the originating channel for
/// crash recovery routing.
pub async fn send_message_with_tools_and_callback(
&self,
session_id: Uuid,
user_message: String,
model: Option<String>,
cancel_token: Option<CancellationToken>,
override_approval_callback: Option<ApprovalCallback>,
override_progress_callback: Option<ProgressCallback>,
channel: &str,
channel_chat_id: Option<&str>,
) -> Result<AgentResponse> {
self.run_tool_loop(
session_id,
user_message,
model,
cancel_token,
override_approval_callback,
override_progress_callback,
channel,
channel_chat_id,
)
.await
}
}