use anyhow::{Context, Result};
use minio::s3::builders::ObjectContent;
use minio::s3::creds::StaticProvider;
use minio::s3::http::BaseUrl;
use minio::s3::types::S3Api;
use minio::s3::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use crate::provider::Message;
use crate::session::faults::Fault;
const HISTORY_OBJECT_NAME: &str = "history.jsonl";
const DEFAULT_PREFIX: &str = "history/";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistorySinkConfig {
pub endpoint: String,
pub access_key: String,
pub secret_key: String,
pub bucket: String,
#[serde(default = "default_prefix")]
pub prefix: String,
}
fn default_prefix() -> String {
DEFAULT_PREFIX.to_string()
}
impl HistorySinkConfig {
pub fn from_env() -> Result<Option<Self>> {
let endpoint = match std::env::var("CODETETHER_HISTORY_S3_ENDPOINT") {
Ok(s) if !s.trim().is_empty() => s,
_ => return Ok(None),
};
let bucket = match std::env::var("CODETETHER_HISTORY_S3_BUCKET") {
Ok(s) if !s.trim().is_empty() => s,
_ => return Ok(None),
};
let access_key = match std::env::var("CODETETHER_HISTORY_S3_ACCESS_KEY") {
Ok(s) if !s.trim().is_empty() => s,
_ => return Ok(None),
};
let secret_key = match std::env::var("CODETETHER_HISTORY_S3_SECRET_KEY") {
Ok(s) if !s.trim().is_empty() => s,
_ => return Ok(None),
};
let prefix = std::env::var("CODETETHER_HISTORY_S3_PREFIX")
.ok()
.filter(|s| !s.trim().is_empty())
.map(|s| if s.ends_with('/') { s } else { format!("{s}/") })
.unwrap_or_else(default_prefix);
Ok(Some(Self {
endpoint,
access_key,
secret_key,
bucket,
prefix,
}))
}
pub fn object_key(&self, session_id: &str) -> String {
format!("{}{session_id}/{HISTORY_OBJECT_NAME}", self.prefix)
}
}
fn build_client(config: &HistorySinkConfig) -> Result<MinioClient> {
let base_url = BaseUrl::from_str(&config.endpoint)
.with_context(|| format!("Invalid MinIO endpoint: {}", config.endpoint))?;
let creds = StaticProvider::new(&config.access_key, &config.secret_key, None);
MinioClientBuilder::new(base_url)
.provider(Some(Box::new(creds)))
.build()
.context("Failed to build MinIO client for history sink")
}
pub fn encode_jsonl_delta(messages: &[Message], start: usize) -> Result<String> {
let mut buf = String::new();
for msg in messages.iter().skip(start) {
let line =
serde_json::to_string(msg).context("failed to serialize Message to JSON for sink")?;
buf.push_str(&line);
buf.push('\n');
}
Ok(buf)
}
pub async fn upload_full_history(
config: &HistorySinkConfig,
session_id: &str,
messages: &[Message],
) -> Result<()> {
let body = encode_jsonl_delta(messages, 0)?;
let bytes = body.into_bytes();
let byte_len = bytes.len();
let client = build_client(config)?;
let content = ObjectContent::from(bytes);
let key = config.object_key(session_id);
client
.put_object_content(&config.bucket, &key, content)
.send()
.await
.with_context(|| {
format!(
"failed to PUT s3://{}/{key} ({} bytes)",
config.bucket, byte_len
)
})?;
tracing::debug!(
bucket = %config.bucket,
key = %key,
bytes = byte_len,
"history sink upload complete"
);
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PointerHandle {
pub bucket: String,
pub key: String,
#[serde(default)]
pub byte_range: Option<(u64, u64)>,
}
impl PointerHandle {
pub fn for_session(config: &HistorySinkConfig, session_id: &str) -> Self {
Self {
bucket: config.bucket.clone(),
key: config.object_key(session_id),
byte_range: None,
}
}
}
pub async fn resolve_pointer(
config: &HistorySinkConfig,
handle: &PointerHandle,
) -> Result<Vec<u8>, Fault> {
let client = build_client(config).map_err(|e| Fault::BackendError {
reason: format!("minio client build failed: {e}"),
})?;
let (offset, length) = match handle.byte_range {
Some((start, end)) => {
let len = end.saturating_sub(start);
if len == 0 {
return Err(Fault::NoMatch);
}
(Some(start), Some(len))
}
None => (None, None),
};
let body = client
.get_object(&handle.bucket, &handle.key)
.offset(offset)
.length(length)
.send()
.await
.map_err(|e| Fault::BackendError {
reason: format!("GET s3://{}/{} failed: {e}", handle.bucket, handle.key),
})?
.content
.to_segmented_bytes()
.await
.map_err(|e| Fault::BackendError {
reason: format!("read body for {} failed: {e}", handle.key),
})?
.to_bytes();
if body.is_empty() {
return Err(Fault::NoMatch);
}
Ok(body.to_vec())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::{ContentPart, Message, Role};
#[test]
fn config_object_key_joins_prefix_and_session_id() {
let cfg = HistorySinkConfig {
endpoint: "http://x".to_string(),
access_key: "a".to_string(),
secret_key: "s".to_string(),
bucket: "b".to_string(),
prefix: "history/".to_string(),
};
assert_eq!(
cfg.object_key("abc-123"),
"history/abc-123/history.jsonl".to_string()
);
}
#[test]
fn encode_jsonl_delta_is_line_per_message_and_skippable() {
let msgs = vec![
Message {
role: Role::User,
content: vec![ContentPart::Text {
text: "one".to_string(),
}],
},
Message {
role: Role::Assistant,
content: vec![ContentPart::Text {
text: "two".to_string(),
}],
},
];
let full = encode_jsonl_delta(&msgs, 0).unwrap();
assert_eq!(full.lines().count(), 2);
assert!(full.contains("\"one\""));
assert!(full.contains("\"two\""));
let tail = encode_jsonl_delta(&msgs, 1).unwrap();
assert_eq!(tail.lines().count(), 1);
assert!(tail.contains("\"two\""));
let nothing = encode_jsonl_delta(&msgs, 2).unwrap();
assert!(nothing.is_empty());
}
#[test]
fn pointer_handle_for_session_targets_history_object() {
let cfg = HistorySinkConfig {
endpoint: "http://x".to_string(),
access_key: "a".to_string(),
secret_key: "s".to_string(),
bucket: "b".to_string(),
prefix: "history/".to_string(),
};
let handle = PointerHandle::for_session(&cfg, "sess");
assert_eq!(handle.bucket, "b");
assert_eq!(handle.key, "history/sess/history.jsonl");
assert!(handle.byte_range.is_none());
}
#[test]
fn pointer_handle_round_trips_through_serde() {
let handle = PointerHandle {
bucket: "b".into(),
key: "k".into(),
byte_range: Some((16, 64)),
};
let json = serde_json::to_string(&handle).unwrap();
let back: PointerHandle = serde_json::from_str(&json).unwrap();
assert_eq!(back, handle);
}
#[test]
fn from_env_returns_none_when_endpoint_unset() {
unsafe {
std::env::remove_var("CODETETHER_HISTORY_S3_ENDPOINT");
}
let cfg = HistorySinkConfig::from_env().unwrap();
assert!(cfg.is_none());
}
}