codetether_agent/session/history_sink.rs
1//! Durable MinIO/S3 sink for pure chat history.
2//!
3//! ## Why a second sink?
4//!
5//! Phase A of the history/context refactor makes [`Session::messages`] a
6//! pure, append-only record of *what happened*. The local JSON file at
7//! `~/.local/share/codetether/sessions/{id}.json` is the authoritative
8//! resume cache, but it also gets rewritten on every `save()` — so a
9//! crash between saves or a laptop loss evicts the record entirely.
10//!
11//! The history sink streams the pure transcript to a MinIO-compatible
12//! S3 bucket as an **append-only JSONL** object, one line per message.
13//! This serves three purposes that the Liu et al.
14//! (arXiv:2512.22087), Meta-Harness (arXiv:2603.28052), and ClawVM
15//! (arXiv:2604.10352) references all emphasise:
16//!
17//! 1. **Durable archive** for `session_recall` across machines and
18//! crashes.
19//! 2. **Filesystem-as-history substrate** — the virtual
20//! `context_browse` files served in Phase B are backed by these
21//! objects.
22//! 3. **Pointer-residency backing store** — when a
23//! [`ResidencyLevel::Pointer`](super::pages) page is selected, its
24//! handle points here.
25//!
26//! The sink is **env-gated** and **non-fatal**. When the endpoint isn't
27//! configured or the upload fails, the session continues; a single
28//! `tracing::warn!` records the error and the local file remains
29//! authoritative for resume.
30//!
31//! ## Configuration
32//!
33//! Reads five environment variables at session load / first save:
34//!
35//! | Variable | Required? | Purpose |
36//! |------------------------------------------|-----------|----------------------------------|
37//! | `CODETETHER_HISTORY_S3_ENDPOINT` | Yes | MinIO/S3 endpoint URL |
38//! | `CODETETHER_HISTORY_S3_BUCKET` | Yes | Bucket name |
39//! | `CODETETHER_HISTORY_S3_PREFIX` | No | Key prefix (default `history/`) |
40//! | `CODETETHER_HISTORY_S3_ACCESS_KEY` | Yes | S3 access key |
41//! | `CODETETHER_HISTORY_S3_SECRET_KEY` | Yes | S3 secret key |
42//!
43//! When any *required* variable is unset, [`HistorySinkConfig::from_env`]
44//! returns `Ok(None)` and the sink is silently disabled.
45//!
46//! ## Examples
47//!
48//! ```rust,no_run
49//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
50//! use codetether_agent::session::history_sink::HistorySinkConfig;
51//!
52//! match HistorySinkConfig::from_env() {
53//! Ok(Some(config)) => {
54//! assert!(!config.endpoint.is_empty());
55//! }
56//! Ok(None) => {
57//! // Env vars not set; sink disabled.
58//! }
59//! Err(e) => eprintln!("history sink misconfigured: {e}"),
60//! }
61//! # });
62//! ```
63
64use anyhow::{Context, Result};
65use minio::s3::builders::ObjectContent;
66use minio::s3::creds::StaticProvider;
67use minio::s3::http::BaseUrl;
68use minio::s3::types::S3Api;
69use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
70use serde::{Deserialize, Serialize};
71use std::str::FromStr;
72
73use crate::provider::Message;
74use crate::session::faults::Fault;
75
76/// Object-key suffix where the full transcript lives, relative to
77/// [`HistorySinkConfig::prefix`].
78const HISTORY_OBJECT_NAME: &str = "history.jsonl";
79
80/// Default object-key prefix when
81/// [`HistorySinkConfig::CODETETHER_HISTORY_S3_PREFIX`] is unset.
82///
83/// [`HistorySinkConfig::CODETETHER_HISTORY_S3_PREFIX`]: HistorySinkConfig
84const DEFAULT_PREFIX: &str = "history/";
85
86/// Durable history-sink configuration.
87///
88/// Intentionally owned (not borrowed) so the config can be handed to a
89/// `tokio::spawn` fire-and-forget upload without tying its lifetime to
90/// the `Session`.
91///
92/// # Examples
93///
94/// ```rust
95/// use codetether_agent::session::history_sink::HistorySinkConfig;
96///
97/// let cfg = HistorySinkConfig {
98/// endpoint: "http://localhost:9000".to_string(),
99/// access_key: "minioadmin".to_string(),
100/// secret_key: "minioadmin".to_string(),
101/// bucket: "codetether".to_string(),
102/// prefix: "history/".to_string(),
103/// };
104/// assert_eq!(cfg.bucket, "codetether");
105/// ```
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct HistorySinkConfig {
108 /// MinIO / S3 endpoint URL, e.g. `http://localhost:9000`.
109 pub endpoint: String,
110 /// S3 access key.
111 pub access_key: String,
112 /// S3 secret key.
113 pub secret_key: String,
114 /// Destination bucket.
115 pub bucket: String,
116 /// Key prefix under the bucket. Always ends with `/`.
117 #[serde(default = "default_prefix")]
118 pub prefix: String,
119}
120
121fn default_prefix() -> String {
122 DEFAULT_PREFIX.to_string()
123}
124
125impl HistorySinkConfig {
126 /// Load the config from environment variables.
127 ///
128 /// Returns `Ok(None)` when any required variable is unset — callers
129 /// treat that as *sink disabled* and skip the upload. Returns
130 /// `Err(_)` only when an env variable is present but malformed.
131 ///
132 /// # Errors
133 ///
134 /// Never in the current implementation; reserved for future URL
135 /// / region validation.
136 pub fn from_env() -> Result<Option<Self>> {
137 let endpoint = match std::env::var("CODETETHER_HISTORY_S3_ENDPOINT") {
138 Ok(s) if !s.trim().is_empty() => s,
139 _ => return Ok(None),
140 };
141 let bucket = match std::env::var("CODETETHER_HISTORY_S3_BUCKET") {
142 Ok(s) if !s.trim().is_empty() => s,
143 _ => return Ok(None),
144 };
145 let access_key = match std::env::var("CODETETHER_HISTORY_S3_ACCESS_KEY") {
146 Ok(s) if !s.trim().is_empty() => s,
147 _ => return Ok(None),
148 };
149 let secret_key = match std::env::var("CODETETHER_HISTORY_S3_SECRET_KEY") {
150 Ok(s) if !s.trim().is_empty() => s,
151 _ => return Ok(None),
152 };
153 let prefix = std::env::var("CODETETHER_HISTORY_S3_PREFIX")
154 .ok()
155 .filter(|s| !s.trim().is_empty())
156 .map(|s| if s.ends_with('/') { s } else { format!("{s}/") })
157 .unwrap_or_else(default_prefix);
158
159 Ok(Some(Self {
160 endpoint,
161 access_key,
162 secret_key,
163 bucket,
164 prefix,
165 }))
166 }
167
168 /// Object key for the full transcript of `session_id`.
169 ///
170 /// Example: `history/1234-abcd/history.jsonl`.
171 pub fn object_key(&self, session_id: &str) -> String {
172 format!("{}{session_id}/{HISTORY_OBJECT_NAME}", self.prefix)
173 }
174}
175
176/// Build a MinIO client from a [`HistorySinkConfig`].
177fn build_client(config: &HistorySinkConfig) -> Result<MinioClient> {
178 let base_url = BaseUrl::from_str(&config.endpoint)
179 .with_context(|| format!("Invalid MinIO endpoint: {}", config.endpoint))?;
180 let creds = StaticProvider::new(&config.access_key, &config.secret_key, None);
181 MinioClientBuilder::new(base_url)
182 .provider(Some(Box::new(creds)))
183 .build()
184 .context("Failed to build MinIO client for history sink")
185}
186
187/// Encode `messages[start..]` as JSONL — one line per message.
188///
189/// Separate from the upload path so it is unit-testable without a
190/// MinIO server.
191pub fn encode_jsonl_delta(messages: &[Message], start: usize) -> Result<String> {
192 let mut buf = String::new();
193 for msg in messages.iter().skip(start) {
194 let line =
195 serde_json::to_string(msg).context("failed to serialize Message to JSON for sink")?;
196 buf.push_str(&line);
197 buf.push('\n');
198 }
199 Ok(buf)
200}
201
202/// Overwrite the session's `history.jsonl` object with the full JSONL
203/// rendering of `messages`.
204///
205/// Simplest semantic: PUT the whole file every time. Delta uploads
206/// (append semantics over a chunked layout) are a Phase B follow-up —
207/// full-file rewrite keeps the sink single-round-trip and easy to
208/// reason about for Phase A.
209///
210/// # Errors
211///
212/// Returns the underlying MinIO error verbatim; the caller is expected
213/// to log-and-swallow (non-fatal sink) unless it is in a test that
214/// actually wants upload failure to surface.
215pub async fn upload_full_history(
216 config: &HistorySinkConfig,
217 session_id: &str,
218 messages: &[Message],
219) -> Result<()> {
220 let body = encode_jsonl_delta(messages, 0)?;
221 let bytes = body.into_bytes();
222 let byte_len = bytes.len();
223 let client = build_client(config)?;
224 let content = ObjectContent::from(bytes);
225 let key = config.object_key(session_id);
226 client
227 .put_object_content(&config.bucket, &key, content)
228 .send()
229 .await
230 .with_context(|| {
231 format!(
232 "failed to PUT s3://{}/{key} ({} bytes)",
233 config.bucket, byte_len
234 )
235 })?;
236 tracing::debug!(
237 bucket = %config.bucket,
238 key = %key,
239 bytes = byte_len,
240 "history sink upload complete"
241 );
242 Ok(())
243}
244
245/// Stable locator for a single `ResidencyLevel::Pointer` page's
246/// backing bytes.
247///
248/// The Phase B incremental derivation can degrade an `Evidence` page
249/// (or any page whose degradation path permits it) to `Pointer` and
250/// hand the corresponding [`PointerHandle`] to
251/// [`resolve_pointer`] on demand. Keeping the handle pure data makes
252/// it serialisable into the page sidecar without dragging the MinIO
253/// client along.
254///
255/// # Examples
256///
257/// ```rust
258/// use codetether_agent::session::history_sink::PointerHandle;
259///
260/// let handle = PointerHandle {
261/// bucket: "codetether".to_string(),
262/// key: "history/abc-123/history.jsonl".to_string(),
263/// byte_range: Some((0, 512)),
264/// };
265/// assert_eq!(handle.byte_range, Some((0, 512)));
266/// ```
267#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
268pub struct PointerHandle {
269 /// S3 bucket name.
270 pub bucket: String,
271 /// Full object key.
272 pub key: String,
273 /// Optional half-open `(start, end)` byte range. When `None`, the
274 /// full object is fetched.
275 #[serde(default)]
276 pub byte_range: Option<(u64, u64)>,
277}
278
279impl PointerHandle {
280 /// Construct a handle that points at the whole history object for
281 /// `session_id` under the sink's configured prefix.
282 pub fn for_session(config: &HistorySinkConfig, session_id: &str) -> Self {
283 Self {
284 bucket: config.bucket.clone(),
285 key: config.object_key(session_id),
286 byte_range: None,
287 }
288 }
289}
290
291/// Fetch the bytes referenced by a [`PointerHandle`] from the MinIO
292/// sink.
293///
294/// Returns a typed [`Fault`] so callers get the same reason-code
295/// surface ClawVM §3 prescribes for context-construction failures
296/// (*silent* empty returns are forbidden).
297///
298/// # Errors
299///
300/// * [`Fault::BackendError`] when the MinIO client build or the GET
301/// itself fails.
302/// * [`Fault::NoMatch`] when the object is missing but the backend
303/// responded cleanly.
304///
305/// # Range semantics
306///
307/// When `byte_range` is `Some((start, end))`, the returned slice is
308/// truncated to that half-open range within the full object body.
309/// Ranges that fall off the end are clamped to `object.len()`.
310pub async fn resolve_pointer(
311 config: &HistorySinkConfig,
312 handle: &PointerHandle,
313) -> Result<Vec<u8>, Fault> {
314 let client = build_client(config).map_err(|e| Fault::BackendError {
315 reason: format!("minio client build failed: {e}"),
316 })?;
317
318 // Use HTTP Range requests when the handle specifies a byte range so
319 // we don't download the whole history object just to slice out a
320 // few KB. `offset` + `length` map directly to a `Range` header in
321 // the minio 0.3 builder.
322 let (offset, length) = match handle.byte_range {
323 Some((start, end)) => {
324 let len = end.saturating_sub(start);
325 if len == 0 {
326 return Err(Fault::NoMatch);
327 }
328 (Some(start), Some(len))
329 }
330 None => (None, None),
331 };
332
333 let body = client
334 .get_object(&handle.bucket, &handle.key)
335 .offset(offset)
336 .length(length)
337 .send()
338 .await
339 .map_err(|e| Fault::BackendError {
340 reason: format!("GET s3://{}/{} failed: {e}", handle.bucket, handle.key),
341 })?
342 .content
343 .to_segmented_bytes()
344 .await
345 .map_err(|e| Fault::BackendError {
346 reason: format!("read body for {} failed: {e}", handle.key),
347 })?
348 .to_bytes();
349 if body.is_empty() {
350 return Err(Fault::NoMatch);
351 }
352 Ok(body.to_vec())
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use crate::provider::{ContentPart, Message, Role};
359
360 #[test]
361 fn config_object_key_joins_prefix_and_session_id() {
362 let cfg = HistorySinkConfig {
363 endpoint: "http://x".to_string(),
364 access_key: "a".to_string(),
365 secret_key: "s".to_string(),
366 bucket: "b".to_string(),
367 prefix: "history/".to_string(),
368 };
369 assert_eq!(
370 cfg.object_key("abc-123"),
371 "history/abc-123/history.jsonl".to_string()
372 );
373 }
374
375 #[test]
376 fn encode_jsonl_delta_is_line_per_message_and_skippable() {
377 let msgs = vec![
378 Message {
379 role: Role::User,
380 content: vec![ContentPart::Text {
381 text: "one".to_string(),
382 }],
383 },
384 Message {
385 role: Role::Assistant,
386 content: vec![ContentPart::Text {
387 text: "two".to_string(),
388 }],
389 },
390 ];
391
392 let full = encode_jsonl_delta(&msgs, 0).unwrap();
393 assert_eq!(full.lines().count(), 2);
394 assert!(full.contains("\"one\""));
395 assert!(full.contains("\"two\""));
396
397 let tail = encode_jsonl_delta(&msgs, 1).unwrap();
398 assert_eq!(tail.lines().count(), 1);
399 assert!(tail.contains("\"two\""));
400
401 let nothing = encode_jsonl_delta(&msgs, 2).unwrap();
402 assert!(nothing.is_empty());
403 }
404
405 #[test]
406 fn pointer_handle_for_session_targets_history_object() {
407 let cfg = HistorySinkConfig {
408 endpoint: "http://x".to_string(),
409 access_key: "a".to_string(),
410 secret_key: "s".to_string(),
411 bucket: "b".to_string(),
412 prefix: "history/".to_string(),
413 };
414 let handle = PointerHandle::for_session(&cfg, "sess");
415 assert_eq!(handle.bucket, "b");
416 assert_eq!(handle.key, "history/sess/history.jsonl");
417 assert!(handle.byte_range.is_none());
418 }
419
420 #[test]
421 fn pointer_handle_round_trips_through_serde() {
422 let handle = PointerHandle {
423 bucket: "b".into(),
424 key: "k".into(),
425 byte_range: Some((16, 64)),
426 };
427 let json = serde_json::to_string(&handle).unwrap();
428 let back: PointerHandle = serde_json::from_str(&json).unwrap();
429 assert_eq!(back, handle);
430 }
431
432 #[test]
433 fn from_env_returns_none_when_endpoint_unset() {
434 // SAFETY: modifying env vars in tests is required for coverage
435 // of the disabled-sink code path. `unsafe` is mandated by Rust
436 // 2024's tightening of `remove_var`.
437 unsafe {
438 std::env::remove_var("CODETETHER_HISTORY_S3_ENDPOINT");
439 }
440 let cfg = HistorySinkConfig::from_env().unwrap();
441 assert!(cfg.is_none());
442 }
443}