Skip to main content

codetether_agent/session/
history_sink.rs

1//! Durable MinIO/S3 sink for pure chat history.
2//!
3//! ## Why a second sink?
4//!
5//! Phase A of the history/context refactor makes [`Session::messages`] a
6//! pure, append-only record of *what happened*. The local JSON file at
7//! `~/.local/share/codetether/sessions/{id}.json` is the authoritative
8//! resume cache, but it also gets rewritten on every `save()` — so a
9//! crash between saves or a laptop loss evicts the record entirely.
10//!
11//! The history sink streams the pure transcript to a MinIO-compatible
12//! S3 bucket as an **append-only JSONL** object, one line per message.
13//! This serves three purposes that the Liu et al.
14//! (arXiv:2512.22087), Meta-Harness (arXiv:2603.28052), and ClawVM
15//! (arXiv:2604.10352) references all emphasise:
16//!
17//! 1. **Durable archive** for `session_recall` across machines and
18//!    crashes.
19//! 2. **Filesystem-as-history substrate** — the virtual
20//!    `context_browse` files served in Phase B are backed by these
21//!    objects.
22//! 3. **Pointer-residency backing store** — when a
23//!    [`ResidencyLevel::Pointer`](super::pages) page is selected, its
24//!    handle points here.
25//!
26//! The sink is **env-gated** and **non-fatal**. When the endpoint isn't
27//! configured or the upload fails, the session continues; a single
28//! `tracing::warn!` records the error and the local file remains
29//! authoritative for resume.
30//!
31//! ## Configuration
32//!
33//! Reads five environment variables at session load / first save:
34//!
35//! | Variable                                 | Required? | Purpose                          |
36//! |------------------------------------------|-----------|----------------------------------|
37//! | `CODETETHER_HISTORY_S3_ENDPOINT`         | Yes       | MinIO/S3 endpoint URL            |
38//! | `CODETETHER_HISTORY_S3_BUCKET`           | Yes       | Bucket name                      |
39//! | `CODETETHER_HISTORY_S3_PREFIX`           | No        | Key prefix (default `history/`)  |
40//! | `CODETETHER_HISTORY_S3_ACCESS_KEY`       | Yes       | S3 access key                    |
41//! | `CODETETHER_HISTORY_S3_SECRET_KEY`       | Yes       | S3 secret key                    |
42//!
43//! When any *required* variable is unset, [`HistorySinkConfig::from_env`]
44//! returns `Ok(None)` and the sink is silently disabled.
45//!
46//! ## Examples
47//!
48//! ```rust,no_run
49//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
50//! use codetether_agent::session::history_sink::HistorySinkConfig;
51//!
52//! match HistorySinkConfig::from_env() {
53//!     Ok(Some(config)) => {
54//!         assert!(!config.endpoint.is_empty());
55//!     }
56//!     Ok(None) => {
57//!         // Env vars not set; sink disabled.
58//!     }
59//!     Err(e) => eprintln!("history sink misconfigured: {e}"),
60//! }
61//! # });
62//! ```
63
64use anyhow::{Context, Result};
65use minio::s3::builders::ObjectContent;
66use minio::s3::creds::StaticProvider;
67use minio::s3::http::BaseUrl;
68use minio::s3::types::S3Api;
69use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
70use serde::{Deserialize, Serialize};
71use std::str::FromStr;
72
73use crate::provider::Message;
74use crate::session::faults::Fault;
75
76/// Object-key suffix where the full transcript lives, relative to
77/// [`HistorySinkConfig::prefix`].
78const HISTORY_OBJECT_NAME: &str = "history.jsonl";
79
80/// Default object-key prefix when
81/// [`HistorySinkConfig::CODETETHER_HISTORY_S3_PREFIX`] is unset.
82///
83/// [`HistorySinkConfig::CODETETHER_HISTORY_S3_PREFIX`]: HistorySinkConfig
84const DEFAULT_PREFIX: &str = "history/";
85
86/// Durable history-sink configuration.
87///
88/// Intentionally owned (not borrowed) so the config can be handed to a
89/// `tokio::spawn` fire-and-forget upload without tying its lifetime to
90/// the `Session`.
91///
92/// # Examples
93///
94/// ```rust
95/// use codetether_agent::session::history_sink::HistorySinkConfig;
96///
97/// let cfg = HistorySinkConfig {
98///     endpoint: "http://localhost:9000".to_string(),
99///     access_key: "minioadmin".to_string(),
100///     secret_key: "minioadmin".to_string(),
101///     bucket: "codetether".to_string(),
102///     prefix: "history/".to_string(),
103/// };
104/// assert_eq!(cfg.bucket, "codetether");
105/// ```
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct HistorySinkConfig {
108    /// MinIO / S3 endpoint URL, e.g. `http://localhost:9000`.
109    pub endpoint: String,
110    /// S3 access key.
111    pub access_key: String,
112    /// S3 secret key.
113    pub secret_key: String,
114    /// Destination bucket.
115    pub bucket: String,
116    /// Key prefix under the bucket. Always ends with `/`.
117    #[serde(default = "default_prefix")]
118    pub prefix: String,
119}
120
121fn default_prefix() -> String {
122    DEFAULT_PREFIX.to_string()
123}
124
125impl HistorySinkConfig {
126    /// Load the config from environment variables.
127    ///
128    /// Returns `Ok(None)` when any required variable is unset — callers
129    /// treat that as *sink disabled* and skip the upload. Returns
130    /// `Err(_)` only when an env variable is present but malformed.
131    ///
132    /// # Errors
133    ///
134    /// Never in the current implementation; reserved for future URL
135    /// / region validation.
136    pub fn from_env() -> Result<Option<Self>> {
137        let endpoint = match std::env::var("CODETETHER_HISTORY_S3_ENDPOINT") {
138            Ok(s) if !s.trim().is_empty() => s,
139            _ => return Ok(None),
140        };
141        let bucket = match std::env::var("CODETETHER_HISTORY_S3_BUCKET") {
142            Ok(s) if !s.trim().is_empty() => s,
143            _ => return Ok(None),
144        };
145        let access_key = match std::env::var("CODETETHER_HISTORY_S3_ACCESS_KEY") {
146            Ok(s) if !s.trim().is_empty() => s,
147            _ => return Ok(None),
148        };
149        let secret_key = match std::env::var("CODETETHER_HISTORY_S3_SECRET_KEY") {
150            Ok(s) if !s.trim().is_empty() => s,
151            _ => return Ok(None),
152        };
153        let prefix = std::env::var("CODETETHER_HISTORY_S3_PREFIX")
154            .ok()
155            .filter(|s| !s.trim().is_empty())
156            .map(|s| if s.ends_with('/') { s } else { format!("{s}/") })
157            .unwrap_or_else(default_prefix);
158
159        Ok(Some(Self {
160            endpoint,
161            access_key,
162            secret_key,
163            bucket,
164            prefix,
165        }))
166    }
167
168    /// Object key for the full transcript of `session_id`.
169    ///
170    /// Example: `history/1234-abcd/history.jsonl`.
171    pub fn object_key(&self, session_id: &str) -> String {
172        format!("{}{session_id}/{HISTORY_OBJECT_NAME}", self.prefix)
173    }
174}
175
176/// Build a MinIO client from a [`HistorySinkConfig`].
177fn build_client(config: &HistorySinkConfig) -> Result<MinioClient> {
178    let base_url = BaseUrl::from_str(&config.endpoint)
179        .with_context(|| format!("Invalid MinIO endpoint: {}", config.endpoint))?;
180    let creds = StaticProvider::new(&config.access_key, &config.secret_key, None);
181    MinioClientBuilder::new(base_url)
182        .provider(Some(Box::new(creds)))
183        .build()
184        .context("Failed to build MinIO client for history sink")
185}
186
187/// Encode `messages[start..]` as JSONL — one line per message.
188///
189/// Separate from the upload path so it is unit-testable without a
190/// MinIO server.
191pub fn encode_jsonl_delta(messages: &[Message], start: usize) -> Result<String> {
192    let mut buf = String::new();
193    for msg in messages.iter().skip(start) {
194        let line =
195            serde_json::to_string(msg).context("failed to serialize Message to JSON for sink")?;
196        buf.push_str(&line);
197        buf.push('\n');
198    }
199    Ok(buf)
200}
201
202/// Overwrite the session's `history.jsonl` object with the full JSONL
203/// rendering of `messages`.
204///
205/// Simplest semantic: PUT the whole file every time. Delta uploads
206/// (append semantics over a chunked layout) are a Phase B follow-up —
207/// full-file rewrite keeps the sink single-round-trip and easy to
208/// reason about for Phase A.
209///
210/// # Errors
211///
212/// Returns the underlying MinIO error verbatim; the caller is expected
213/// to log-and-swallow (non-fatal sink) unless it is in a test that
214/// actually wants upload failure to surface.
215pub async fn upload_full_history(
216    config: &HistorySinkConfig,
217    session_id: &str,
218    messages: &[Message],
219) -> Result<()> {
220    let body = encode_jsonl_delta(messages, 0)?;
221    let bytes = body.into_bytes();
222    let byte_len = bytes.len();
223    let client = build_client(config)?;
224    let content = ObjectContent::from(bytes);
225    let key = config.object_key(session_id);
226    client
227        .put_object_content(&config.bucket, &key, content)
228        .send()
229        .await
230        .with_context(|| {
231            format!(
232                "failed to PUT s3://{}/{key} ({} bytes)",
233                config.bucket, byte_len
234            )
235        })?;
236    tracing::debug!(
237        bucket = %config.bucket,
238        key = %key,
239        bytes = byte_len,
240        "history sink upload complete"
241    );
242    Ok(())
243}
244
245/// Stable locator for a single `ResidencyLevel::Pointer` page's
246/// backing bytes.
247///
248/// The Phase B incremental derivation can degrade an `Evidence` page
249/// (or any page whose degradation path permits it) to `Pointer` and
250/// hand the corresponding [`PointerHandle`] to
251/// [`resolve_pointer`] on demand. Keeping the handle pure data makes
252/// it serialisable into the page sidecar without dragging the MinIO
253/// client along.
254///
255/// # Examples
256///
257/// ```rust
258/// use codetether_agent::session::history_sink::PointerHandle;
259///
260/// let handle = PointerHandle {
261///     bucket: "codetether".to_string(),
262///     key: "history/abc-123/history.jsonl".to_string(),
263///     byte_range: Some((0, 512)),
264/// };
265/// assert_eq!(handle.byte_range, Some((0, 512)));
266/// ```
267#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
268pub struct PointerHandle {
269    /// S3 bucket name.
270    pub bucket: String,
271    /// Full object key.
272    pub key: String,
273    /// Optional half-open `(start, end)` byte range. When `None`, the
274    /// full object is fetched.
275    #[serde(default)]
276    pub byte_range: Option<(u64, u64)>,
277}
278
279impl PointerHandle {
280    /// Construct a handle that points at the whole history object for
281    /// `session_id` under the sink's configured prefix.
282    pub fn for_session(config: &HistorySinkConfig, session_id: &str) -> Self {
283        Self {
284            bucket: config.bucket.clone(),
285            key: config.object_key(session_id),
286            byte_range: None,
287        }
288    }
289}
290
291/// Fetch the bytes referenced by a [`PointerHandle`] from the MinIO
292/// sink.
293///
294/// Returns a typed [`Fault`] so callers get the same reason-code
295/// surface ClawVM §3 prescribes for context-construction failures
296/// (*silent* empty returns are forbidden).
297///
298/// # Errors
299///
300/// * [`Fault::BackendError`] when the MinIO client build or the GET
301///   itself fails.
302/// * [`Fault::NoMatch`] when the object is missing but the backend
303///   responded cleanly.
304///
305/// # Range semantics
306///
307/// When `byte_range` is `Some((start, end))`, the returned slice is
308/// truncated to that half-open range within the full object body.
309/// Ranges that fall off the end are clamped to `object.len()`.
310pub async fn resolve_pointer(
311    config: &HistorySinkConfig,
312    handle: &PointerHandle,
313) -> Result<Vec<u8>, Fault> {
314    let client = build_client(config).map_err(|e| Fault::BackendError {
315        reason: format!("minio client build failed: {e}"),
316    })?;
317
318    // Use HTTP Range requests when the handle specifies a byte range so
319    // we don't download the whole history object just to slice out a
320    // few KB. `offset` + `length` map directly to a `Range` header in
321    // the minio 0.3 builder.
322    let (offset, length) = match handle.byte_range {
323        Some((start, end)) => {
324            let len = end.saturating_sub(start);
325            if len == 0 {
326                return Err(Fault::NoMatch);
327            }
328            (Some(start), Some(len))
329        }
330        None => (None, None),
331    };
332
333    let body = client
334        .get_object(&handle.bucket, &handle.key)
335        .offset(offset)
336        .length(length)
337        .send()
338        .await
339        .map_err(|e| Fault::BackendError {
340            reason: format!("GET s3://{}/{} failed: {e}", handle.bucket, handle.key),
341        })?
342        .content
343        .to_segmented_bytes()
344        .await
345        .map_err(|e| Fault::BackendError {
346            reason: format!("read body for {} failed: {e}", handle.key),
347        })?
348        .to_bytes();
349    if body.is_empty() {
350        return Err(Fault::NoMatch);
351    }
352    Ok(body.to_vec())
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use crate::provider::{ContentPart, Message, Role};
359
360    #[test]
361    fn config_object_key_joins_prefix_and_session_id() {
362        let cfg = HistorySinkConfig {
363            endpoint: "http://x".to_string(),
364            access_key: "a".to_string(),
365            secret_key: "s".to_string(),
366            bucket: "b".to_string(),
367            prefix: "history/".to_string(),
368        };
369        assert_eq!(
370            cfg.object_key("abc-123"),
371            "history/abc-123/history.jsonl".to_string()
372        );
373    }
374
375    #[test]
376    fn encode_jsonl_delta_is_line_per_message_and_skippable() {
377        let msgs = vec![
378            Message {
379                role: Role::User,
380                content: vec![ContentPart::Text {
381                    text: "one".to_string(),
382                }],
383            },
384            Message {
385                role: Role::Assistant,
386                content: vec![ContentPart::Text {
387                    text: "two".to_string(),
388                }],
389            },
390        ];
391
392        let full = encode_jsonl_delta(&msgs, 0).unwrap();
393        assert_eq!(full.lines().count(), 2);
394        assert!(full.contains("\"one\""));
395        assert!(full.contains("\"two\""));
396
397        let tail = encode_jsonl_delta(&msgs, 1).unwrap();
398        assert_eq!(tail.lines().count(), 1);
399        assert!(tail.contains("\"two\""));
400
401        let nothing = encode_jsonl_delta(&msgs, 2).unwrap();
402        assert!(nothing.is_empty());
403    }
404
405    #[test]
406    fn pointer_handle_for_session_targets_history_object() {
407        let cfg = HistorySinkConfig {
408            endpoint: "http://x".to_string(),
409            access_key: "a".to_string(),
410            secret_key: "s".to_string(),
411            bucket: "b".to_string(),
412            prefix: "history/".to_string(),
413        };
414        let handle = PointerHandle::for_session(&cfg, "sess");
415        assert_eq!(handle.bucket, "b");
416        assert_eq!(handle.key, "history/sess/history.jsonl");
417        assert!(handle.byte_range.is_none());
418    }
419
420    #[test]
421    fn pointer_handle_round_trips_through_serde() {
422        let handle = PointerHandle {
423            bucket: "b".into(),
424            key: "k".into(),
425            byte_range: Some((16, 64)),
426        };
427        let json = serde_json::to_string(&handle).unwrap();
428        let back: PointerHandle = serde_json::from_str(&json).unwrap();
429        assert_eq!(back, handle);
430    }
431
432    #[test]
433    fn from_env_returns_none_when_endpoint_unset() {
434        // SAFETY: modifying env vars in tests is required for coverage
435        // of the disabled-sink code path. `unsafe` is mandated by Rust
436        // 2024's tightening of `remove_var`.
437        unsafe {
438            std::env::remove_var("CODETETHER_HISTORY_S3_ENDPOINT");
439        }
440        let cfg = HistorySinkConfig::from_env().unwrap();
441        assert!(cfg.is_none());
442    }
443}