newt-acp-worker 0.6.2

Newt-Agent ACP worker — Agent Client Protocol server for drake-foreman dispatch
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
//! ACP server — minimal Agent Client Protocol implementation over stdio.
//!
//! Speaks newline-delimited JSON-RPC 2.0 so `drake-foreman` can dispatch
//! coding goals. Each ACP session pairs a workspace path with optional
//! model override; the `prompt` handler runs inference against the
//! configured backend and (in later steps) applies any unified diff the
//! model returns.
//!
//! Per workspace memory:
//! - Worker ONLY edits files. Never `git add`/`git commit`/`git push`.
//! - Empty `git diff` post-turn surfaces as `empty_diff: true` in the
//!   reply (the CLI binary translates that into a non-zero exit).
//! - `TaskReply.model_id` is mandatory.

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use newt_core::SessionId;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Mutex;

/// Per-session state kept by the ACP server.
#[derive(Debug, Clone)]
pub struct Session {
    /// Workspace root for this session.
    pub workspace_path: PathBuf,
    /// Optional model override set via `set_session_model`.
    pub model_override: Option<String>,
    /// Whether this session uses the `newt-coder` plugin
    /// (whole-file emit + server-side diff normalization).
    /// Activated per-session via the `coder: true` field on
    /// `new_session` params, or process-wide via `NEWT_CODER=1`.
    /// See the failure-mode taxonomy in
    /// `~/workspaces/knowledge/board/drake/2026-05-29_newt-coder-failure-mode-taxonomy.md`.
    pub coder_enabled: bool,
}

/// JSON-RPC ACP server. Holds the inference backend and an in-memory
/// session map.
pub struct AcpServer {
    sessions: Arc<Mutex<HashMap<SessionId, Session>>>,
    backend: Arc<dyn newt_inference::InferenceBackend>,
}

/// Structured reply for one `prompt` turn.
///
/// # Contract
///
/// Per workspace memory `feedback_drake_patch_not_prose` and
/// `feedback_empty_diff_is_a_crash`:
///
/// - `model_id` is **mandatory**. It is a non-Option `String` so the
///   field cannot be silently omitted from the wire format. Use
///   [`TaskReply::new`] for the validated constructor that rejects an
///   empty `model_id` — foreman uses this field to attribute the
///   patch and update the model's scorecard, so a missing id is
///   non-recoverable.
/// - `empty_diff: true` means the worker produced no real edits and
///   foreman should disqualify it pre-arbiter.
/// - `diff_applied: true` means a unified diff was found in `content`
///   and `newt_tools::apply_patch` accepted it.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TaskReply {
    /// MANDATORY — the model that produced this reply.
    pub model_id: String,
    /// Assistant content (typically the unified diff).
    pub content: String,
    /// Captured workspace diff after the turn.
    pub diff: String,
    /// True if the captured diff is empty (no real changes).
    pub empty_diff: bool,
    /// True if a unified diff was detected in `content` and applied
    /// successfully.
    pub diff_applied: bool,
    /// Set by the newt-coder plugin: "whole_files", "unified_diff",
    /// or "prose" (the wire-stable constants in
    /// `plugins_protocol::emission_shape`). `None` when the legacy
    /// newt-flat path produced the reply.
    ///
    /// Lets the foreman's scorecard distinguish failure modes T0a /
    /// T0b / T0c instead of lumping them together as "empty diff".
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub emission_shape: Option<String>,
    /// The model's first raw emission — newt-coder's
    /// `CoderRun.first_emission`, or the flat path's reply content.
    ///
    /// The eval `diff_applies` evaluator runs `git apply --check` against
    /// this (when it is diff-shaped) rather than the post-hoc captured
    /// diff, so a model that emits an unappliable diff the fuzzy worker
    /// only rescued is scored honestly (#30B). `None` for legacy payloads.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub raw_emission: Option<String>,
}

impl TaskReply {
    /// Validated constructor. Rejects an empty `model_id` so a buggy
    /// backend can't silently produce an unattributable reply.
    pub fn new(
        model_id: impl Into<String>,
        content: impl Into<String>,
        diff: impl Into<String>,
        diff_applied: bool,
    ) -> anyhow::Result<Self> {
        let model_id = model_id.into();
        if model_id.is_empty() {
            anyhow::bail!("TaskReply.model_id is mandatory and must not be empty");
        }
        let diff = diff.into();
        let empty_diff = crate::diff::is_empty_diff(&diff);
        Ok(Self {
            model_id,
            content: content.into(),
            diff,
            empty_diff,
            diff_applied,
            emission_shape: None,
            raw_emission: None,
        })
    }

    /// Builder: attach the emission shape label the newt-coder plugin
    /// produced. The legacy newt-flat path leaves this `None`.
    #[must_use]
    pub fn with_emission_shape(mut self, shape: impl Into<String>) -> Self {
        self.emission_shape = Some(shape.into());
        self
    }

    /// Builder: attach the model's first raw emission so the eval
    /// `diff_applies` evaluator can judge it with the strict oracle (#30B).
    #[must_use]
    pub fn with_raw_emission(mut self, raw: impl Into<String>) -> Self {
        self.raw_emission = Some(raw.into());
        self
    }
}

impl AcpServer {
    /// Build a new server bound to `backend`.
    pub fn new(backend: Arc<dyn newt_inference::InferenceBackend>) -> Self {
        Self {
            sessions: Arc::new(Mutex::new(HashMap::new())),
            backend,
        }
    }

    /// Run the server over stdin/stdout.
    pub async fn run_stdio(self) -> anyhow::Result<()> {
        self.run(tokio::io::stdin(), tokio::io::stdout()).await
    }

    /// Run the server over arbitrary async reader/writer.
    pub async fn run<R, W>(self, reader: R, mut writer: W) -> anyhow::Result<()>
    where
        R: tokio::io::AsyncRead + Unpin,
        W: tokio::io::AsyncWrite + Unpin,
    {
        let buf = BufReader::new(reader);
        let mut lines = buf.lines();

        while let Some(line) = lines.next_line().await? {
            if line.trim().is_empty() {
                continue;
            }

            let request: Value = match serde_json::from_str(&line) {
                Ok(v) => v,
                Err(e) => {
                    let resp = error_response(Value::Null, -32700, &format!("Parse error: {e}"));
                    write_response(&mut writer, &resp).await?;
                    continue;
                }
            };

            let id = request.get("id").cloned().unwrap_or(Value::Null);
            let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
            let params = request.get("params").cloned().unwrap_or(Value::Null);

            let response = match self.handle(method, params).await {
                Ok(result) => serde_json::json!({
                    "jsonrpc": "2.0",
                    "id": id,
                    "result": result,
                }),
                Err(e) => error_response(id, -32603, &e.to_string()),
            };

            write_response(&mut writer, &response).await?;
        }

        Ok(())
    }

    /// Dispatch one parsed request to the matching handler.
    async fn handle(&self, method: &str, params: Value) -> anyhow::Result<Value> {
        match method {
            "initialize" => self.handle_initialize(params).await,
            "new_session" => self.handle_new_session(params).await,
            "set_session_model" => self.handle_set_session_model(params).await,
            "prompt" => self.handle_prompt(params).await,
            _ => anyhow::bail!("method not found: {method}"),
        }
    }

    /// `initialize` — return the protocol version and capabilities.
    async fn handle_initialize(&self, _params: Value) -> anyhow::Result<Value> {
        Ok(serde_json::json!({
            "protocolVersion": "v0.1",
            "serverInfo": {
                "name": "newt-acp-worker",
                "version": env!("CARGO_PKG_VERSION"),
            },
            "capabilities": {
                "prompting": true,
                "diff_capture": true,
            },
        }))
    }

    /// `new_session` — create a session bound to a workspace path.
    ///
    /// Optional params:
    /// - `coder: true` — opt this session into the `newt-coder`
    ///   plugin (whole-file emit + server-side diff normalization).
    ///   The `NEWT_CODER=1` process-wide env opts every session in;
    ///   this per-session field is the finer-grained switch.
    async fn handle_new_session(&self, params: Value) -> anyhow::Result<Value> {
        let workspace_path: PathBuf = params
            .get("workspace_path")
            .and_then(|p| p.as_str())
            .map(PathBuf::from)
            .ok_or_else(|| anyhow::anyhow!("workspace_path required"))?;

        if !workspace_path.exists() {
            anyhow::bail!(
                "workspace_path does not exist: {}",
                workspace_path.display()
            );
        }

        let env_coder = std::env::var("NEWT_CODER")
            .map(|v| v == "1")
            .unwrap_or(false);
        let param_coder = params
            .get("coder")
            .and_then(|v| v.as_bool())
            .unwrap_or(false);
        let coder_enabled = env_coder || param_coder;

        let session_id = SessionId::new();
        let mut sessions = self.sessions.lock().await;
        sessions.insert(
            session_id,
            Session {
                workspace_path,
                model_override: None,
                coder_enabled,
            },
        );

        Ok(serde_json::json!({
            "session_id": session_id.to_string(),
            "coder": coder_enabled,
        }))
    }

    /// `set_session_model` — override the model used for subsequent
    /// `prompt` turns within an existing session.
    async fn handle_set_session_model(&self, params: Value) -> anyhow::Result<Value> {
        let session_id: SessionId = params
            .get("session_id")
            .and_then(|s| s.as_str())
            .ok_or_else(|| anyhow::anyhow!("session_id required"))?
            .parse()?;
        let model = params
            .get("model")
            .and_then(|m| m.as_str())
            .ok_or_else(|| anyhow::anyhow!("model required"))?
            .to_string();

        let mut sessions = self.sessions.lock().await;
        let session = sessions
            .get_mut(&session_id)
            .ok_or_else(|| anyhow::anyhow!("unknown session: {session_id}"))?;
        session.model_override = Some(model);

        Ok(serde_json::json!({ "ok": true }))
    }

    /// `prompt` — run one inference turn against the session's workspace.
    ///
    /// Two dispatch paths:
    ///
    /// - **newt-flat (default).** Sends the operator's prompt verbatim
    ///   with the "respond with unified diffs only" directive; if the
    ///   reply looks like a diff, tries to apply it. This is the
    ///   minimal path that hits failure mode T0b on most local Ollama
    ///   models (see the taxonomy card).
    ///
    /// - **newt-coder.** Activated when `session.coder_enabled` is
    ///   true (via `NEWT_CODER=1` env or `coder: true` on
    ///   `new_session`). Delegates to [`newt_coder::Coder`]: scans
    ///   the workspace for referenced files, injects their contents
    ///   into the prompt, asks the model for whole-file emit, and
    ///   writes the result back to the workspace. The captured
    ///   `git diff` then represents real edits the model actually
    ///   made — closing T0b.
    ///
    /// Both paths capture the post-turn `git diff` and return a
    /// [`TaskReply`] with the mandatory `model_id`, the assistant
    /// content, the diff, `empty_diff` / `diff_applied` flags, and
    /// (newt-coder only) the `emission_shape` label.
    async fn handle_prompt(&self, params: Value) -> anyhow::Result<Value> {
        let session_id: SessionId = params
            .get("session_id")
            .and_then(|s| s.as_str())
            .ok_or_else(|| anyhow::anyhow!("session_id required"))?
            .parse()?;
        let prompt = params
            .get("prompt")
            .and_then(|p| p.as_str())
            .ok_or_else(|| anyhow::anyhow!("prompt required"))?
            .to_string();

        let session = {
            let sessions = self.sessions.lock().await;
            sessions
                .get(&session_id)
                .cloned()
                .ok_or_else(|| anyhow::anyhow!("unknown session: {session_id}"))?
        };

        let task_reply = if session.coder_enabled {
            self.handle_prompt_coder(&session, &prompt).await?
        } else {
            self.handle_prompt_flat(&session, &prompt).await?
        };

        Ok(serde_json::to_value(task_reply)?)
    }

    /// Legacy newt-flat path: verbatim prompt + "unified diffs only"
    /// directive. Kept for callers (and the existing eval corpus) that
    /// rely on the minimal-prompt contract.
    async fn handle_prompt_flat(
        &self,
        session: &Session,
        prompt: &str,
    ) -> anyhow::Result<TaskReply> {
        let req = newt_inference::ChatRequest::new()
            .system("You are a coding assistant. Respond with unified diffs only.")
            .user(prompt.to_string());

        let reply = self.backend.complete(req).await?;

        // If the reply contains a unified diff, try to apply it. We
        // accept the patch unconditionally on success; on failure we
        // log and continue so the diff text still makes it back to the
        // caller for debugging.
        let diff_applied = if looks_like_unified_diff(&reply.content) {
            match newt_tools::apply_patch(&session.workspace_path, &reply.content) {
                Ok(()) => true,
                Err(e) => {
                    tracing::warn!(error = %e, "patch application failed");
                    false
                }
            }
        } else {
            false
        };

        let diff = crate::diff::capture_diff(&session.workspace_path)?;
        // Flat path has no re-prompt fallback, so the reply content IS the
        // model's first (and only) emission.
        let raw_emission = reply.content.clone();
        TaskReply::new(reply.model_id, reply.content, diff, diff_applied)
            .map(|r| r.with_raw_emission(raw_emission))
            .map_err(|e| anyhow::anyhow!("backend returned malformed reply: {e}"))
    }

    /// newt-coder path: whole-file emit + server-side diff normalization.
    /// Closes failure mode T0b on local Ollama coder models.
    async fn handle_prompt_coder(
        &self,
        session: &Session,
        prompt: &str,
    ) -> anyhow::Result<TaskReply> {
        let coder = newt_coder::Coder::new(Arc::clone(&self.backend));
        // 35b: every Coder::run dispatch is gated on a Caveats value.
        // The ACP worker has no peer cert today — that's the 35c handoff
        // (newt-mesh extracts caveats from the verified peer cert and
        // hands them in here). Until then we pass top (= the user's full
        // authority), preserving pre-35b behavior; the enforcement
        // machinery is wired so 35c only needs to swap the argument.
        let caveats = newt_core::Caveats::top();
        let run = coder
            .run(&session.workspace_path, prompt, &caveats)
            .await
            .map_err(|e| anyhow::anyhow!("newt-coder run failed: {e}"))?;

        // newt-coder already wrote any whole-file or unified-diff
        // edits to the workspace; capture the resulting real diff.
        let diff = crate::diff::capture_diff(&session.workspace_path)?;
        let diff_applied = !run.files_written.is_empty() || !diff.trim().is_empty();

        let content = format!(
            "[newt-coder] {} file(s) written via {}",
            run.files_written.len(),
            run.emission_shape,
        );

        Ok(TaskReply::new(run.model_id, content, diff, diff_applied)
            .map_err(|e| anyhow::anyhow!("newt-coder returned malformed reply: {e}"))?
            .with_emission_shape(run.emission_shape)
            .with_raw_emission(run.first_emission))
    }
}

/// True if `content` looks like a unified diff (has both `--- ` and
/// `+++ ` headers). Cheap heuristic — the real parser in
/// `newt_tools::apply_patch` is the source of truth on validity.
fn looks_like_unified_diff(content: &str) -> bool {
    content.contains("--- ") && content.contains("+++ ")
}

/// Write a JSON-RPC response as a single newline-terminated line.
async fn write_response<W: tokio::io::AsyncWrite + Unpin>(
    writer: &mut W,
    response: &Value,
) -> anyhow::Result<()> {
    let mut out = serde_json::to_string(response)?;
    out.push('\n');
    writer.write_all(out.as_bytes()).await?;
    writer.flush().await?;
    Ok(())
}

/// Build a JSON-RPC error response value.
fn error_response(id: Value, code: i32, message: &str) -> Value {
    serde_json::json!({
        "jsonrpc": "2.0",
        "id": id,
        "error": { "code": code, "message": message },
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn task_reply_rejects_empty_model_id() {
        let err = TaskReply::new("", "content", "", false).unwrap_err();
        assert!(
            err.to_string().contains("mandatory"),
            "expected mandatory-id error, got: {err}"
        );
    }

    #[test]
    fn task_reply_accepts_nonempty_model_id() {
        let r = TaskReply::new("qwen2.5-coder:32b", "hi", "", false).unwrap();
        assert_eq!(r.model_id, "qwen2.5-coder:32b");
        assert_eq!(r.content, "hi");
    }

    #[test]
    fn task_reply_sets_empty_diff_from_diff_string() {
        let r = TaskReply::new("m", "c", "", false).unwrap();
        assert!(r.empty_diff);

        let r = TaskReply::new("m", "c", "real\nchanges\n", true).unwrap();
        assert!(!r.empty_diff);
    }

    #[test]
    fn task_reply_serde_round_trip_preserves_model_id() {
        let r = TaskReply::new("m", "c", "d\n", true).unwrap();
        let json = serde_json::to_string(&r).unwrap();
        // The wire format must always include model_id.
        assert!(json.contains("\"model_id\":\"m\""));
        let back: TaskReply = serde_json::from_str(&json).unwrap();
        assert_eq!(back, r);
    }

    #[test]
    fn task_reply_deserialize_without_model_id_fails() {
        // Direct serde deserialization of a payload missing model_id
        // must fail — the field is required.
        let bad = r#"{"content":"c","diff":"","empty_diff":true,"diff_applied":false}"#;
        let err = serde_json::from_str::<TaskReply>(bad).unwrap_err();
        assert!(
            err.to_string().contains("model_id"),
            "expected missing-model_id error, got: {err}"
        );
    }

    #[test]
    fn task_reply_emission_shape_defaults_none() {
        let r = TaskReply::new("m", "c", "", false).unwrap();
        assert_eq!(r.emission_shape, None);
    }

    #[test]
    fn task_reply_with_emission_shape_builder() {
        let r = TaskReply::new("m", "c", "", false)
            .unwrap()
            .with_emission_shape("whole_files");
        assert_eq!(r.emission_shape.as_deref(), Some("whole_files"));
    }

    #[test]
    fn task_reply_omits_null_emission_shape_from_wire() {
        // The legacy newt-flat path produces None; the wire format
        // should not carry a `"emission_shape": null` key (downstream
        // consumers can pre-date the field).
        let r = TaskReply::new("m", "c", "", false).unwrap();
        let json = serde_json::to_string(&r).unwrap();
        assert!(
            !json.contains("emission_shape"),
            "expected emission_shape omitted when None, got: {json}"
        );
    }

    #[test]
    fn task_reply_carries_emission_shape_on_wire_when_set() {
        let r = TaskReply::new("m", "c", "", true)
            .unwrap()
            .with_emission_shape("whole_files");
        let json = serde_json::to_string(&r).unwrap();
        assert!(json.contains("\"emission_shape\":\"whole_files\""));
        let back: TaskReply = serde_json::from_str(&json).unwrap();
        assert_eq!(back.emission_shape.as_deref(), Some("whole_files"));
    }

    #[test]
    fn task_reply_old_wire_without_emission_shape_still_parses() {
        // A producer that pre-dates this field must still deserialize
        // cleanly — `emission_shape` is `serde(default)`.
        let old =
            r#"{"model_id":"m","content":"c","diff":"","empty_diff":true,"diff_applied":false}"#;
        let r: TaskReply = serde_json::from_str(old).unwrap();
        assert_eq!(r.model_id, "m");
        assert_eq!(r.emission_shape, None);
    }

    #[test]
    fn looks_like_unified_diff_detects_headers() {
        assert!(looks_like_unified_diff(
            "--- a/f\n+++ b/f\n@@ -1,1 +1,1 @@\n-a\n+b\n"
        ));
        assert!(!looks_like_unified_diff("just prose"));
        assert!(!looks_like_unified_diff("--- only the old header"));
    }
}