1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{oneshot, Mutex, Notify};
use tracing::{info, warn};
use crate::claude;
use crate::config::Config;
use crate::handlers::chat::{CallbackConfig, RequestMetadata};
use crate::tracker::RequestTracker;
use crate::voice_session::VoiceSessionTracker;
pub struct QueuedRequest {
pub channel: String,
pub sender: String,
pub metadata: RequestMetadata,
pub callback: Option<CallbackConfig>,
pub prompt: String,
pub original_message: String,
pub respond: oneshot::Sender<String>,
}
/// Priority-aware FIFO queue. Supports normal `send` (back of queue)
/// and `send_priority` (front of queue) for cross-channel conversation merging.
#[derive(Clone)]
pub struct Queue {
inner: Arc<Mutex<VecDeque<QueuedRequest>>>,
notify: Arc<Notify>,
}
impl Queue {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::with_capacity(64))),
notify: Arc::new(Notify::new()),
}
}
/// Enqueue at the back (normal FIFO ordering).
pub async fn send(&self, req: QueuedRequest) {
self.inner.lock().await.push_back(req);
self.notify.notify_one();
}
/// Enqueue at the front (priority — next to be processed).
pub async fn send_priority(&self, req: QueuedRequest) {
info!(
"[{}] sender={} Priority enqueue (cross-channel merge)",
req.channel, req.sender
);
self.inner.lock().await.push_front(req);
self.notify.notify_one();
}
/// Wait for and take the next request.
async fn recv(&self) -> QueuedRequest {
loop {
{
let mut queue = self.inner.lock().await;
if let Some(req) = queue.pop_front() {
return req;
}
}
self.notify.notified().await;
}
}
}
pub fn spawn(
config: Config,
tracker: RequestTracker,
voice_sessions: VoiceSessionTracker,
) -> Queue {
let queue = Queue::new();
let worker_queue = queue.clone();
tokio::spawn(worker(worker_queue, config, tracker, voice_sessions));
queue
}
async fn worker(
queue: Queue,
config: Config,
tracker: RequestTracker,
voice_sessions: VoiceSessionTracker,
) {
let mut session_id: Option<String> = None;
let mut last_used = Instant::now();
let timeout = Duration::from_secs(config.session_ttl_secs);
let http_client = reqwest::Client::new();
loop {
let req = queue.recv().await;
// Check idle timeout
if last_used.elapsed() > timeout && session_id.is_some() {
info!("Session expired after idle timeout, starting fresh");
session_id = None;
}
// Track voice sessions: if this is a voice request, register/refresh
if req.channel == "voice" {
if let Some(call_sid) = &req.metadata.call_sid {
voice_sessions.touch(&req.sender, call_sid).await;
}
}
let request_id = tracker
.start(&req.channel, &req.sender, &req.original_message)
.await;
let self_doc = config
.self_path
.as_deref()
.and_then(|path| std::fs::read_to_string(path).ok());
let response = claude::invoke(
&config.claude_bin,
&req.prompt,
&config.home,
session_id.as_deref(),
self_doc.as_deref(),
)
.await;
tracker.complete(request_id, &response.text).await;
if let Some(sid) = &response.session_id {
session_id = Some(sid.clone());
}
last_used = Instant::now();
let truncated = if response.text.len() > 120 {
format!("{}...", &response.text[..120])
} else {
response.text.clone()
};
info!(
"[{}] sender={} Response: {truncated}",
req.channel, req.sender
);
// Cross-channel voice routing: if this request came from a non-voice
// channel and the sender has an active voice call, inject the response
// into the call instead of returning it on the original channel.
let mut injected = false;
if req.channel != "voice" {
if let Some(call_sid) = voice_sessions.active_call_sid(&req.sender).await {
if let Some(ref voice_url) = config.voice_echo_url {
info!(
"[{}] sender={} Routing response to active voice call {}",
req.channel, req.sender, call_sid
);
let inject_url = format!("{}/api/inject", voice_url.trim_end_matches('/'));
let mut inject_req = http_client.post(&inject_url).json(&serde_json::json!({
"call_sid": call_sid,
"text": &response.text,
}));
if let Some(ref token) = config.voice_echo_token {
inject_req = inject_req.bearer_auth(token);
}
match inject_req.send().await {
Ok(resp) if resp.status().is_success() => {
info!("[{}] Response injected into voice call", req.channel);
injected = true;
}
Ok(resp) => {
warn!(
"[{}] Voice inject failed (HTTP {}), falling back to original channel",
req.channel,
resp.status()
);
}
Err(e) => {
warn!(
"[{}] Voice inject request failed: {e}, falling back to original channel",
req.channel
);
}
}
}
}
}
// Route response via callback if configured
if let Some(cb) = &req.callback {
match cb.callback_type.as_str() {
"discord" => {
if !injected {
if let Some(channel_id) = &req.metadata.discord_channel_id {
if let Some(ref token) = config.discord_bot_token {
let url = format!(
"https://discord.com/api/v10/channels/{}/messages",
channel_id
);
for chunk in chunk_text(&response.text, 2000) {
let payload = serde_json::json!({ "content": chunk });
match http_client
.post(&url)
.header("Authorization", format!("Bot {}", token))
.json(&payload)
.send()
.await
{
Ok(resp) if resp.status().is_success() => {}
Ok(resp) => {
warn!(
"[{}] Discord callback failed (HTTP {})",
req.channel,
resp.status()
);
}
Err(e) => {
warn!("[{}] Discord callback error: {e}", req.channel);
}
}
}
info!("[{}] Response delivered via Discord callback", req.channel);
} else {
warn!(
"[{}] Discord callback requested but no bot token configured",
req.channel
);
}
} else {
warn!(
"[{}] Discord callback requested but no channel_id in metadata",
req.channel
);
}
}
}
"webhook" => {
if let Some(url) = &cb.url {
let payload = serde_json::json!({
"response": &response.text,
"channel": &req.channel,
"sender": &req.sender,
"metadata": {
"call_sid": &req.metadata.call_sid,
"discord_channel_id": &req.metadata.discord_channel_id,
"workflow_id": &req.metadata.workflow_id,
}
});
if let Err(e) = http_client.post(url).json(&payload).send().await {
warn!("Callback webhook failed: {e}");
}
}
}
other => {
warn!("[{}] Unknown callback type: {other}", req.channel);
}
}
}
// Send response back via oneshot. If injected into voice, send
// a brief ack instead of the full response.
if injected {
let _ = req.respond.send("Responding on call.".to_string());
} else {
let _ = req.respond.send(response.text);
}
}
}
/// Split text into chunks of at most `max_len` bytes, splitting at char boundaries.
fn chunk_text(text: &str, max_len: usize) -> Vec<&str> {
let mut chunks = Vec::new();
let mut start = 0;
while start < text.len() {
let end = if start + max_len >= text.len() {
text.len()
} else {
text.floor_char_boundary(start + max_len)
};
chunks.push(&text[start..end]);
start = end;
}
chunks
}