sapphire-agent 0.6.0

A personal AI assistant agent with Matrix/Discord channels, Anthropic backend, and a sapphire-workspace memory layer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
//! A2A (Agent2Agent Protocol) v1 server endpoints.
//!
//! Exposes:
//!   - `POST /a2a` — JSON-RPC 2.0 dispatch. Currently only `SendMessage`
//!     is implemented (synchronous: receive a `Message`, drive the
//!     internal chat turn to completion, return a terminal `Task`).
//!   - `GET /.well-known/agent-card.json` — A2A agent card discovery.
//!
//! Auth: `Authorization: Bearer <token>` matched against
//! `[room_profile.<n>].api_keys`. The match resolves the request's
//! `room_profile` implicitly — clients don't need to (and can't) name
//! it. A token leak therefore exposes one profile, not all.
//!
//! Out of scope (v1): `SendStreamingMessage` (SSE), `GetTask`,
//! `CancelTask`, `SubscribeToTask`, push notifications, `FilePart` `url`
//! (privacy/security: requires explicit operator opt-in), `DataPart`.
//! `FilePart` inline (`raw` base64) for `image/*` is supported and routes
//! through the existing multimodal provider path. Wire-format types come
//! from `a2a-lf`; the JSON-RPC dispatch is hand-rolled here to share
//! `ServeState` with the existing `/rpc` endpoint.

use std::convert::Infallible;
use std::sync::Arc;

use a2a::{
    Message, Part, PartContent, Role, SendMessageRequest, SendMessageResponse, Task, TaskState,
    TaskStatus, new_task_id,
};
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::response::IntoResponse;
use axum::{Json, response::sse::Event};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
use chrono::Utc;
use serde_json::{Value, json};
use tokio::sync::mpsc;

use super::ServeState;
use crate::provider::ChatMessage;

/// Method name for the only A2A method we implement. The A2A v1 spec
/// uses PascalCase JSON-RPC method names (`SendMessage`, `GetTask`,
/// …), unlike the v0.3 slash form (`message/send`).
const METHOD_SEND_MESSAGE: &str = "SendMessage";

/// JSON-RPC error codes used by this handler. Standard 2.0 codes plus
/// the A2A-spec-aligned application range (-32000 … -32099 reserved).
mod codes {
    pub const PARSE_ERROR: i32 = -32700;
    pub const INVALID_REQUEST: i32 = -32600;
    pub const METHOD_NOT_FOUND: i32 = -32601;
    pub const INVALID_PARAMS: i32 = -32602;
    pub const INTERNAL_ERROR: i32 = -32603;
    /// A2A "authenticated request required" application-level code.
    pub const AUTH_REQUIRED: i32 = -32001;
}

// ---------------------------------------------------------------------------
// Agent Card
// ---------------------------------------------------------------------------

/// `GET /.well-known/agent-card.json` — A2A discovery endpoint.
///
/// Returns 404 when A2A is disabled so probes don't reveal that the
/// server exists at all when the operator hasn't opted in.
pub async fn handle_agent_card(State(state): State<Arc<ServeState>>) -> impl IntoResponse {
    let cfg = match state.config.a2a.as_ref() {
        Some(cfg) if cfg.enabled => cfg,
        _ => return (StatusCode::NOT_FOUND, "A2A disabled").into_response(),
    };

    let name = cfg
        .agent_name
        .clone()
        .unwrap_or_else(|| "sapphire-agent".to_string());
    let description = cfg.agent_description.clone().unwrap_or_else(|| {
        "Personal partner AI agent with persistent character, memory, and tools.".to_string()
    });
    let url = cfg.public_url.clone().unwrap_or_default();

    let card = json!({
        "name": name,
        "description": description,
        "version": env!("CARGO_PKG_VERSION"),
        "supportedInterfaces": [
            {
                "url": url,
                "protocolBinding": "jsonrpc",
                "protocolVersion": a2a::VERSION,
            }
        ],
        "capabilities": {
            "streaming": false,
            "pushNotifications": false,
            "extendedAgentCard": false,
        },
        "defaultInputModes": ["text/plain", "image/jpeg", "image/png", "image/gif", "image/webp"],
        "defaultOutputModes": ["text/plain"],
        "skills": [
            {
                "id": "chat",
                "name": "Chat with the agent",
                "description":
                    "Hold a multi-turn conversation; the agent remembers context across calls \
                     within the same contextId and applies its server-side persona / memory. \
                     Inline images (FilePart raw) are accepted for vision-capable backends.",
                "tags": ["chat", "conversation", "vision"],
                "inputModes": ["text/plain", "image/jpeg", "image/png", "image/gif", "image/webp"],
                "outputModes": ["text/plain"]
            }
        ],
        "securitySchemes": {
            "bearer": {
                "httpAuthSecurityScheme": {
                    "scheme": "Bearer"
                }
            }
        },
        "securityRequirements": [
            { "bearer": [] }
        ]
    });

    (StatusCode::OK, Json(card)).into_response()
}

// ---------------------------------------------------------------------------
// POST /a2a — JSON-RPC 2.0 dispatch
// ---------------------------------------------------------------------------

/// JSON-RPC 2.0 envelope. Kept local so we don't depend on `a2a::JsonRpcId`
/// (which would force us to convert IDs in/out of the existing
/// serde_json `Value` flow).
#[derive(Debug, serde::Deserialize)]
struct JsonRpcEnvelope {
    #[serde(default)]
    jsonrpc: Option<String>,
    #[serde(default)]
    id: Option<Value>,
    method: String,
    #[serde(default)]
    params: Option<Value>,
}

pub async fn handle_a2a_post(
    State(state): State<Arc<ServeState>>,
    headers: HeaderMap,
    body: axum::body::Bytes,
) -> axum::response::Response {
    // 0. Feature gate
    let enabled = state.config.a2a.as_ref().is_some_and(|c| c.enabled);
    if !enabled {
        return (StatusCode::NOT_FOUND, "A2A disabled").into_response();
    }

    // 1. Parse envelope. We parse manually so a malformed body returns
    //    a JSON-RPC parse-error response rather than a 400 — A2A
    //    clients expect a JSON-RPC envelope on every reply.
    let envelope: JsonRpcEnvelope = match serde_json::from_slice(&body) {
        Ok(env) => env,
        Err(e) => {
            return jsonrpc_error_response(
                Value::Null,
                codes::PARSE_ERROR,
                &format!("invalid JSON: {e}"),
            );
        }
    };
    let req_id = envelope.id.clone().unwrap_or(Value::Null);
    if envelope.jsonrpc.as_deref() != Some("2.0") {
        return jsonrpc_error_response(
            req_id,
            codes::INVALID_REQUEST,
            "jsonrpc field must be \"2.0\"",
        );
    }

    // 2. Bearer auth → room_profile reverse lookup. Empty/missing bearer
    //    returns 401 at the HTTP layer (no JSON-RPC envelope) per A2A
    //    spec: auth failures are an HTTP-level concern, not an
    //    application error.
    let bearer = match extract_bearer(&headers) {
        Some(b) => b,
        None => {
            return (StatusCode::UNAUTHORIZED, "missing bearer token").into_response();
        }
    };
    let profile_name = match state.config.resolve_a2a_token(&bearer) {
        Some(name) => name.to_string(),
        None => {
            return jsonrpc_error_response(
                req_id,
                codes::AUTH_REQUIRED,
                "unknown or revoked bearer token",
            );
        }
    };

    // 3. Method dispatch
    match envelope.method.as_str() {
        METHOD_SEND_MESSAGE => {
            handle_send_message(state, req_id, envelope.params, profile_name).await
        }
        other => jsonrpc_error_response(
            req_id,
            codes::METHOD_NOT_FOUND,
            &format!("A2A method '{other}' is not implemented"),
        ),
    }
}

// ---------------------------------------------------------------------------
// SendMessage
// ---------------------------------------------------------------------------

async fn handle_send_message(
    state: Arc<ServeState>,
    req_id: Value,
    params: Option<Value>,
    profile_name: String,
) -> axum::response::Response {
    let raw = params.unwrap_or(Value::Null);
    let request: SendMessageRequest = match serde_json::from_value(raw) {
        Ok(r) => r,
        Err(e) => {
            return jsonrpc_error_response(
                req_id,
                codes::INVALID_PARAMS,
                &format!("malformed SendMessageRequest: {e}"),
            );
        }
    };

    // Extract text + inline image parts. `FilePart` with `url` is rejected
    // (server-side URL fetch is a privacy/security choice that should
    // require explicit operator opt-in); `DataPart` is rejected in v1
    // until we have a structured-tool routing story.
    let (user_text, images) = match collect_text_and_images(&request.message.parts) {
        Ok(v) => v,
        Err(e) => {
            return jsonrpc_error_response(req_id, codes::INVALID_PARAMS, &e);
        }
    };
    if user_text.trim().is_empty() && images.is_empty() {
        return jsonrpc_error_response(
            req_id,
            codes::INVALID_PARAMS,
            "message must contain at least one non-empty text part or an inline image",
        );
    }
    let user_msg = if images.is_empty() {
        ChatMessage::user(&user_text)
    } else {
        ChatMessage::user_with_images(&user_text, images)
    };

    // contextId: client-supplied (resume) or server-generated (new). The
    // params-level field wins over message-level if both are present —
    // A2A spec lets either carry it.
    let context_id = request
        .message
        .context_id
        .clone()
        .filter(|s| !s.is_empty())
        .unwrap_or_else(a2a::new_context_id);

    // Internal session id: deterministic from (profile, contextId) so a
    // crash-restart with the same contextId resumes the same JSONL
    // session. Prefix `a2a-` makes the kind self-documenting in logs;
    // hyphens (not colons) keep the resulting `<session_id>.jsonl`
    // filename portable to Windows.
    let session_id = format!("a2a-{profile_name}-{context_id}");

    // Pin the profile so `run_llm_turn`'s provider/namespace resolution
    // picks up the right room_profile.
    state
        .session_room_profiles
        .lock()
        .await
        .insert(session_id.clone(), profile_name.clone());

    // run_llm_turn streams notifications (tool_start/tool_end, errors)
    // through this mpsc; we drain into oblivion since A2A v1 doesn't
    // surface intermediate progress to the caller. A bounded buffer
    // keeps the producer from blocking if we drain slowly.
    let (tx, mut rx) = mpsc::channel::<Result<Event, Infallible>>(32);
    let drain = tokio::spawn(async move {
        while rx.recv().await.is_some() {
            // discard — A2A v1 is synchronous, no SSE relay
        }
    });

    let outcome = super::run_llm_turn(
        Arc::clone(&state),
        session_id.clone(),
        user_msg,
        req_id.clone(),
        tx,
        None,
    )
    .await;
    drop(drain); // sender dropped at end of run_llm_turn, drain returns

    // Build the terminal Task. Failed turns surface as TASK_STATE_FAILED
    // with a diagnostic message part, matching what A2A clients expect.
    let (state_enum, reply_text) = match outcome.text {
        Some(t) if !t.is_empty() => (TaskState::Completed, t),
        Some(_) => (TaskState::Failed, "(empty response)".to_string()),
        None => (
            TaskState::Failed,
            "agent failed to generate a reply".to_string(),
        ),
    };

    // Mint the new task id once and reference it from both the Task and
    // the inner reply Message. Per A2A v1.0, `Message.taskId` identifies
    // the task this message belongs to — without it, clients that key
    // off `result.status.message.taskId` see `None` and may treat the
    // reply as orphaned.
    let task_id = new_task_id();
    let reply_message = Message {
        message_id: a2a::new_message_id(),
        context_id: Some(context_id.clone()),
        task_id: Some(task_id.clone()),
        role: Role::Agent,
        parts: vec![Part::text(reply_text)],
        metadata: None,
        extensions: None,
        reference_task_ids: None,
    };

    let task = Task {
        id: task_id,
        context_id,
        status: TaskStatus {
            state: state_enum,
            message: Some(reply_message),
            timestamp: Some(Utc::now()),
        },
        artifacts: None,
        history: None,
        metadata: None,
    };

    let response = SendMessageResponse::Task(task);
    let result = match serde_json::to_value(&response) {
        Ok(v) => v,
        Err(e) => {
            return jsonrpc_error_response(
                req_id,
                codes::INTERNAL_ERROR,
                &format!("failed to serialize Task: {e}"),
            );
        }
    };

    let body = json!({
        "jsonrpc": "2.0",
        "id": req_id,
        "result": result,
    });
    (StatusCode::OK, Json(body)).into_response()
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

fn extract_bearer(headers: &HeaderMap) -> Option<String> {
    let value = headers.get(axum::http::header::AUTHORIZATION)?;
    let s = value.to_str().ok()?;
    let token = s
        .strip_prefix("Bearer ")
        .or_else(|| s.strip_prefix("bearer "))?;
    let trimmed = token.trim();
    if trimmed.is_empty() {
        None
    } else {
        Some(trimmed.to_string())
    }
}

/// Extract text and inline images from `parts`. Returns
/// `(joined_text, Vec<(media_type, base64)>)` on success.
///
/// Rejects (with a caller-facing error string for JSON-RPC -32602):
/// - `PartContent::Url`: server-side URL fetch is a privacy/security
///   surface that needs explicit operator opt-in.
/// - `PartContent::Data`: structured-input routing is out of scope for v1.
/// - `PartContent::Raw` with a non-`image/*` `mediaType`: only the
///   multimodal provider path (image) is wired through today.
/// - `PartContent::Raw` without any `mediaType`: ambiguous; the spec
///   makes it optional but we need it to dispatch.
fn collect_text_and_images(parts: &[Part]) -> Result<(String, Vec<(String, String)>), String> {
    let mut text = String::new();
    let mut images: Vec<(String, String)> = Vec::new();
    for p in parts {
        match &p.content {
            PartContent::Text(s) => {
                if !text.is_empty() {
                    text.push('\n');
                }
                text.push_str(s);
            }
            PartContent::Raw(bytes) => {
                let Some(media_type) = p.media_type.as_deref() else {
                    return Err(
                        "FilePart with raw content requires a mediaType (e.g. image/jpeg)"
                            .to_string(),
                    );
                };
                if !media_type.starts_with("image/") {
                    return Err(format!(
                        "unsupported mediaType '{media_type}' for inline FilePart \
                         (v1 only routes image/* to the multimodal provider)"
                    ));
                }
                images.push((media_type.to_string(), BASE64_STANDARD.encode(bytes)));
            }
            PartContent::Url(_) => {
                return Err("FilePart with url is not supported (server-side URL fetch \
                     requires explicit operator opt-in)"
                    .to_string());
            }
            PartContent::Data(_) => {
                return Err("DataPart is not supported in A2A v1 of this agent".to_string());
            }
        }
    }
    Ok((text, images))
}

fn jsonrpc_error_response(id: Value, code: i32, message: &str) -> axum::response::Response {
    let body = json!({
        "jsonrpc": "2.0",
        "id": id,
        "error": { "code": code, "message": message },
    });
    (StatusCode::OK, Json(body)).into_response()
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn collect_text_only() {
        let parts = vec![Part::text("hello"), Part::text("world")];
        let (text, images) = collect_text_and_images(&parts).unwrap();
        assert_eq!(text, "hello\nworld");
        assert!(images.is_empty());
    }

    #[test]
    fn collect_text_plus_inline_image() {
        let img_bytes = b"\xff\xd8\xff\xe0fake-jpeg".to_vec();
        let parts = vec![
            Part::text("look at this"),
            Part::raw(img_bytes.clone()).with_media_type("image/jpeg"),
        ];
        let (text, images) = collect_text_and_images(&parts).unwrap();
        assert_eq!(text, "look at this");
        assert_eq!(images.len(), 1);
        assert_eq!(images[0].0, "image/jpeg");
        // Round-trip: the encoded blob must decode back to the original bytes.
        let decoded = BASE64_STANDARD.decode(&images[0].1).unwrap();
        assert_eq!(decoded, img_bytes);
    }

    #[test]
    fn raw_part_without_media_type_is_rejected() {
        let parts = vec![Part::raw(vec![1, 2, 3])];
        let err = collect_text_and_images(&parts).unwrap_err();
        assert!(err.contains("mediaType"), "got: {err}");
    }

    #[test]
    fn raw_part_with_non_image_media_type_is_rejected() {
        let parts = vec![Part::raw(vec![1, 2, 3]).with_media_type("application/pdf")];
        let err = collect_text_and_images(&parts).unwrap_err();
        assert!(err.contains("application/pdf"), "got: {err}");
    }

    #[test]
    fn url_part_is_rejected() {
        let parts = vec![Part::url("https://example.com/foo.jpg").with_media_type("image/jpeg")];
        let err = collect_text_and_images(&parts).unwrap_err();
        assert!(err.contains("url"), "got: {err}");
    }

    #[test]
    fn data_part_is_rejected() {
        let parts = vec![Part::data(json!({"k": "v"}))];
        let err = collect_text_and_images(&parts).unwrap_err();
        assert!(err.to_lowercase().contains("datapart"), "got: {err}");
    }
}