car-server-core 0.33.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
//! The `agent.chat` harness — makes the assistant a real conversational agent.
//!
//! [`AssistantService`] holds one runtime + a per-`session_id` conversation
//! thread and turns each `agent.chat` request into a loop run, streaming
//! `agent.chat.event` payloads as they're produced. It is transport-agnostic:
//! the caller supplies an async `emit` sink (the `--serve` path forwards it to
//! `DaemonClient::notify`), so the same service is unit-testable without a live
//! daemon. This is the reusable piece that closes the harness gap — until now
//! only bespoke agents (Milo) implemented the agent side of `agent.chat`.
//!
//! Streaming discipline (per `docs/host-protocol.md`): the loop's synchronous
//! `emit` writes into a bounded channel drained by a dedicated task that awaits
//! `emit`, so a slow downstream never blocks the loop and the ack is never
//! delayed behind token production.

use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;

use car_engine::Runtime;
use car_inference::tasks::generate::{ContentBlock, Message};
use serde_json::{json, Value};
use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex};

use super::agent_loop::{
    run_assistant_loop_cancellable, ApprovalDecision, ApprovalGate, AssistantEvent,
};
use super::AssistantConfig;
use crate::coder::native_loop::TurnGenerator;

/// How long a chat turn waits for a host approval before treating it as declined.
const APPROVAL_TIMEOUT: Duration = Duration::from_secs(300);

/// A conversational assistant over one runtime, multiplexed by `session_id`.
pub struct AssistantService {
    generator: Arc<dyn TurnGenerator>,
    runtime: Arc<Runtime>,
    cfg: AssistantConfig,
    /// System prompt seeded as the first message of every new thread.
    system: String,
    /// Per-session conversation threads (multi-turn continuity).
    threads: AsyncMutex<HashMap<String, Vec<Message>>>,
    /// Per-session cancellation flags, set by [`Self::cancel`].
    cancels: StdMutex<HashMap<String, Arc<AtomicBool>>>,
    /// Pending approvals awaiting a host decision, keyed by approval id. Resolved
    /// by [`Self::resolve_approval`] (driven by the `agent.chat.approve` call).
    approvals: Arc<StdMutex<HashMap<String, oneshot::Sender<bool>>>>,
}

impl AssistantService {
    pub fn new(
        generator: Arc<dyn TurnGenerator>,
        runtime: Arc<Runtime>,
        cfg: AssistantConfig,
        system: String,
    ) -> Self {
        Self {
            generator,
            runtime,
            cfg,
            system,
            threads: AsyncMutex::new(HashMap::new()),
            cancels: StdMutex::new(HashMap::new()),
            approvals: Arc::new(StdMutex::new(HashMap::new())),
        }
    }

    /// Signal the session's in-flight turn to stop before its next model call.
    pub fn cancel(&self, session_id: &str) {
        if let Ok(g) = self.cancels.lock() {
            if let Some(flag) = g.get(session_id) {
                flag.store(true, Ordering::Relaxed);
            }
        }
    }

    /// Resolve a pending approval (from an `agent.chat.approve` reverse-call).
    /// Returns true if an approval by that id was waiting.
    pub fn resolve_approval(&self, approval_id: &str, approved: bool) -> bool {
        let tx = self
            .approvals
            .lock()
            .ok()
            .and_then(|mut g| g.remove(approval_id));
        match tx {
            Some(tx) => tx.send(approved).is_ok(),
            None => false,
        }
    }

    /// Run one chat turn for `session_id`, streaming `agent.chat.event` payloads
    /// (each already stamped with `session_id`) through `emit`. Returns when the
    /// turn reaches a terminal state. `attachments` are image `ContentBlock`s
    /// (`image_base64`/`image_url`) forwarded to a vision model on the first
    /// model call. The caller should have already acked the `agent.chat` request
    /// and spawned this on its own task.
    pub async fn handle_turn<E, Fut>(
        &self,
        session_id: &str,
        prompt: &str,
        attachments: Option<Vec<Value>>,
        emit: E,
    ) where
        E: Fn(Value) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = ()> + Send + 'static,
    {
        // Image attachments → ContentBlocks for the vision path. The daemon
        // already validated the shape; keep only image blocks.
        let images: Vec<ContentBlock> = attachments
            .unwrap_or_default()
            .into_iter()
            .filter_map(|a| serde_json::from_value::<ContentBlock>(a).ok())
            .filter(|c| {
                matches!(
                    c,
                    ContentBlock::ImageBase64 { .. } | ContentBlock::ImageUrl { .. }
                )
            })
            .collect();

        // Fresh cancel flag per turn.
        let cancel = Arc::new(AtomicBool::new(false));
        if let Ok(mut g) = self.cancels.lock() {
            g.insert(session_id.to_string(), cancel.clone());
        }

        // Load (or seed) the thread; append the user turn. Clone out so the loop
        // doesn't hold the threads lock across its awaits.
        let mut messages = {
            let mut g = self.threads.lock().await;
            g.entry(session_id.to_string())
                .or_insert_with(|| {
                    vec![Message::System {
                        content: self.system.clone(),
                    }]
                })
                .clone()
        };
        messages.push(Message::User {
            content: prompt.to_string(),
        });

        // Stream events through a channel drained by a dedicated task, so the
        // loop's synchronous emit never blocks on the async sink.
        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
        let drain = tokio::spawn(async move {
            while let Some(v) = rx.recv().await {
                emit(v).await;
            }
        });

        let sid = session_id.to_string();
        // `tx_term` streams the synthesized terminal event after the loop; the
        // loop's emit closure and the gate each hold their own clone. Dropping
        // all three closes the channel so the drain task ends.
        let tx_term = tx.clone();
        let gate = ChatApprovalGate {
            session_id: sid.clone(),
            tx: tx.clone(),
            approvals: self.approvals.clone(),
            counter: AtomicU64::new(0),
        };
        let outcome = run_assistant_loop_cancellable(
            &*self.generator,
            &self.runtime,
            &self.cfg,
            &mut messages,
            &cancel,
            Some(&gate),
            if images.is_empty() {
                None
            } else {
                Some(images.as_slice())
            },
            {
                let sid = sid.clone();
                move |ev| {
                    if let Some(mut payload) = event_to_wire(ev) {
                        payload["session_id"] = json!(sid);
                        let _ = tx.send(payload);
                    }
                }
            },
        )
        .await;
        drop(gate); // release the gate's channel clone

        // The loop emits `done` on a clean finish and `error` on an inference
        // failure; for a turn cap or a cancel it emits no terminal event, so
        // synthesize one here — a host must always see a terminal.
        match outcome.status {
            "max_turns" => {
                let _ = tx_term
                    .send(json!({ "kind": "done", "text": outcome.summary, "session_id": sid }));
            }
            "cancelled" => {
                let _ = tx_term
                    .send(json!({ "kind": "error", "error": "cancelled", "session_id": sid }));
            }
            _ => {}
        }

        drop(tx_term);
        let _ = drain.await;

        // Persist the thread (unless cancelled mid-turn, where the partial
        // assistant/tool messages would leave a dangling exchange).
        if outcome.status != "cancelled" {
            let mut g = self.threads.lock().await;
            g.insert(session_id.to_string(), messages);
        }
        if let Ok(mut g) = self.cancels.lock() {
            g.remove(session_id);
        }
    }
}

/// The chat-surface approval gate: emit an `approval_pending` event and park on
/// a oneshot resolved by [`AssistantService::resolve_approval`] (driven by the
/// host's `agent.chat.approve` reverse-call). Times out to "declined".
struct ChatApprovalGate {
    session_id: String,
    tx: mpsc::UnboundedSender<Value>,
    approvals: Arc<StdMutex<HashMap<String, oneshot::Sender<bool>>>>,
    counter: AtomicU64,
}

#[async_trait::async_trait]
impl ApprovalGate for ChatApprovalGate {
    async fn request(&self, tool: &str, params: &Value) -> ApprovalDecision {
        let n = self.counter.fetch_add(1, Ordering::Relaxed);
        let approval_id = format!("{}-appr-{n}", self.session_id);
        let (otx, orx) = oneshot::channel();
        if let Ok(mut g) = self.approvals.lock() {
            g.insert(approval_id.clone(), otx);
        }
        let _ = self.tx.send(json!({
            "kind": "approval_pending",
            "approval_id": approval_id,
            "tool": tool,
            "params": params,
            "session_id": self.session_id,
        }));
        match tokio::time::timeout(APPROVAL_TIMEOUT, orx).await {
            Ok(Ok(true)) => ApprovalDecision::Approved,
            Ok(Ok(false)) => ApprovalDecision::Denied("declined by user".into()),
            _ => {
                // Timed out or the sender dropped — clean up and treat as denied.
                if let Ok(mut g) = self.approvals.lock() {
                    g.remove(&approval_id);
                }
                ApprovalDecision::Denied("approval timed out".into())
            }
        }
    }
}

/// Translate a loop event into an `agent.chat.event` wire payload. Tool results
/// are internal to the loop and not surfaced as their own kind (the model's
/// subsequent text conveys them).
fn event_to_wire(ev: AssistantEvent) -> Option<Value> {
    match ev {
        AssistantEvent::Text(t) => Some(json!({ "kind": "token", "delta": t })),
        AssistantEvent::ToolCall { name, params } => {
            Some(json!({ "kind": "tool_call", "tool": name, "params": params }))
        }
        AssistantEvent::ToolResult { .. } => None,
        AssistantEvent::Done { text } => Some(json!({ "kind": "done", "text": text })),
        AssistantEvent::Error(e) => Some(json!({ "kind": "error", "error": e })),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::assistant::executor::GeneralExecutor;
    use async_trait::async_trait;
    use car_engine::{LocalSubstrate, Substrate, ToolExecutor};
    use car_inference::{GenerateRequest, InferenceEngine, InferenceResult};
    use std::sync::atomic::AtomicUsize;

    fn turn(text: &str, tool_calls: Value) -> InferenceResult {
        serde_json::from_value(json!({
            "text": text, "tool_calls": tool_calls,
            "trace_id": "t", "model_used": "scripted", "latency_ms": 0,
        }))
        .unwrap()
    }

    struct Script {
        turns: Vec<InferenceResult>,
        cursor: AtomicUsize,
    }
    #[async_trait]
    impl TurnGenerator for Script {
        async fn generate(&self, _r: GenerateRequest) -> Result<InferenceResult, String> {
            let i = self.cursor.fetch_add(1, Ordering::SeqCst);
            self.turns.get(i).cloned().ok_or("exhausted".into())
        }
    }

    async fn runtime(dir: &std::path::Path) -> Arc<Runtime> {
        let substrate: Arc<dyn Substrate> = Arc::new(LocalSubstrate::new());
        let exec: Arc<dyn ToolExecutor> =
            Arc::new(GeneralExecutor::new(substrate.clone(), dir, true));
        let rt = Runtime::new()
            .with_inference(Arc::new(InferenceEngine::new(Default::default())))
            .with_executor(exec)
            .with_substrate(substrate);
        rt.register_agent_basics().await;
        Arc::new(rt)
    }

    #[tokio::test]
    async fn chat_turn_streams_tokens_and_done() {
        let dir = tempfile::tempdir().unwrap();
        let rt = runtime(dir.path()).await;
        let generator: Arc<dyn TurnGenerator> = Arc::new(Script {
            turns: vec![
                turn(
                    "let me compute",
                    json!([{ "id": "c1", "name": "calculate", "arguments": { "expression": "2+2" } }]),
                ),
                turn("It's 4.", json!([])),
            ],
            cursor: AtomicUsize::new(0),
        });
        let cfg = AssistantConfig {
            model: Some("scripted".into()),
            max_turns: 4,
            tools: GeneralExecutor::tool_defs(),
            gated_tools: Vec::new(),
            approval_policy: None,
        };
        let svc = AssistantService::new(generator, rt, cfg, "sys".into());

        let events = Arc::new(StdMutex::new(Vec::<Value>::new()));
        let ev2 = events.clone();
        svc.handle_turn("s1", "what is 2+2?", None, move |v| {
            let ev = ev2.clone();
            async move {
                ev.lock().unwrap().push(v);
            }
        })
        .await;

        let got = events.lock().unwrap().clone();
        // Every event carries the session id.
        assert!(got.iter().all(|e| e["session_id"] == "s1"));
        // A tool_call for calculate was streamed.
        assert!(got
            .iter()
            .any(|e| e["kind"] == "tool_call" && e["tool"] == "calculate"));
        // The last event is the terminal done with the final text.
        let last = got.last().unwrap();
        assert_eq!(last["kind"], "done");
        assert_eq!(last["text"], "It's 4.");

        // Second turn on the same session continues the thread (3 messages
        // seeded: system+user+assistant... at least the thread persisted).
        let thread_len = svc.threads.lock().await.get("s1").map(|m| m.len()).unwrap();
        assert!(thread_len >= 3, "thread should persist across the turn");
    }

    /// The daemon's `try_forward_agent_chat_event` forwards a chat event iff it
    /// is a notification carrying `params.session_id`; the host then dispatches
    /// on `kind`. This asserts every event we emit across a full turn (text,
    /// tool_call, terminal) satisfies that contract — the wire compatibility the
    /// live `agents.chat` → `agent.chat.event` path depends on.
    #[tokio::test]
    async fn every_chat_event_is_forwardable_by_the_daemon() {
        let dir = tempfile::tempdir().unwrap();
        let rt = runtime(dir.path()).await;
        let generator: Arc<dyn TurnGenerator> = Arc::new(Script {
            turns: vec![
                turn(
                    "let me compute",
                    json!([{ "id": "c1", "name": "calculate", "arguments": { "expression": "1+1" } }]),
                ),
                turn("It's 2.", json!([])),
            ],
            cursor: AtomicUsize::new(0),
        });
        let cfg = AssistantConfig {
            model: Some("scripted".into()),
            max_turns: 4,
            tools: GeneralExecutor::tool_defs(),
            gated_tools: Vec::new(),
            approval_policy: None,
        };
        let svc = AssistantService::new(generator, rt, cfg, "sys".into());
        let events = Arc::new(StdMutex::new(Vec::<Value>::new()));
        let ev2 = events.clone();
        svc.handle_turn("sess-42", "1+1?", None, move |v| {
            let ev = ev2.clone();
            async move {
                ev.lock().unwrap().push(v);
            }
        })
        .await;

        const KNOWN_KINDS: [&str; 5] = ["token", "tool_call", "approval_pending", "done", "error"];
        let got = events.lock().unwrap().clone();
        assert!(!got.is_empty());
        for e in &got {
            // Forwarding precondition: session_id present and correct.
            assert_eq!(
                e.get("session_id").and_then(Value::as_str),
                Some("sess-42"),
                "every event must carry its session_id: {e}"
            );
            // Host-dispatchable: a known kind.
            let kind = e.get("kind").and_then(Value::as_str).unwrap_or("");
            assert!(KNOWN_KINDS.contains(&kind), "unknown event kind: {e}");
        }
        // The stream ends in a terminal `done`.
        assert_eq!(got.last().unwrap()["kind"], "done");
    }

    #[tokio::test]
    async fn chat_gated_write_emits_approval_and_resumes_on_approve() {
        let dir = tempfile::tempdir().unwrap();
        let rt = runtime(dir.path()).await;
        let generator: Arc<dyn TurnGenerator> = Arc::new(Script {
            turns: vec![
                turn(
                    "",
                    json!([{ "id": "w1", "name": "write_file", "arguments": { "path": "z.txt", "content": "zephyr" } }]),
                ),
                turn("done", json!([])),
            ],
            cursor: AtomicUsize::new(0),
        });
        let cfg = AssistantConfig {
            model: Some("scripted".into()),
            max_turns: 4,
            tools: GeneralExecutor::tool_defs(),
            gated_tools: vec!["write_file".into()],
            approval_policy: None,
        };
        let svc = Arc::new(AssistantService::new(generator, rt, cfg, "sys".into()));

        let events = Arc::new(StdMutex::new(Vec::<Value>::new()));
        let ev2 = events.clone();

        // Drive the turn and, concurrently, approve the first pending request.
        let svc_run = svc.clone();
        let turn_task = tokio::spawn(async move {
            svc_run
                .handle_turn("s1", "write z.txt", None, move |v| {
                    let ev = ev2.clone();
                    async move {
                        ev.lock().unwrap().push(v);
                    }
                })
                .await;
        });

        // Poll for the approval_pending event, then approve it.
        let approved = {
            let mut ok = false;
            for _ in 0..200 {
                let id = events
                    .lock()
                    .unwrap()
                    .iter()
                    .find(|e| e["kind"] == "approval_pending")
                    .and_then(|e| e["approval_id"].as_str().map(String::from));
                if let Some(id) = id {
                    assert!(svc.resolve_approval(&id, true));
                    ok = true;
                    break;
                }
                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
            }
            ok
        };
        assert!(approved, "an approval_pending event should have been emitted");
        turn_task.await.unwrap();

        // The write ran after approval.
        assert_eq!(
            std::fs::read_to_string(dir.path().join("z.txt")).unwrap(),
            "zephyr"
        );
    }
}