use std::sync::Arc;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use klieo_core::checkpoint::RunCheckpoint;
use klieo_core::error::BusError;
use klieo_core::KvStore;
use rand::rngs::OsRng;
use rand::RngCore;
use serde::{Deserialize, Serialize};
pub(crate) const RESUME_TICKET_BUCKET: &str = "klieo.mcp.resume-tickets";
const TOKEN_ENTROPY_BYTES: usize = 32;
const CLAIMED_TOMBSTONE: &[u8] = b"\x00";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ResumeTicketRecord {
pub principal: String,
pub workflow_name: String,
pub checkpoint: RunCheckpoint,
pub created_at: DateTime<Utc>,
}
pub struct ResumeTicketStore {
kv: Arc<dyn KvStore>,
}
impl ResumeTicketStore {
pub fn new(kv: Arc<dyn KvStore>) -> Self {
Self { kv }
}
pub fn mint_token() -> String {
let mut bytes = [0u8; TOKEN_ENTROPY_BYTES];
OsRng.fill_bytes(&mut bytes);
URL_SAFE_NO_PAD.encode(bytes)
}
pub async fn persist(
&self,
token: &str,
record: &ResumeTicketRecord,
) -> Result<(), TicketStoreError> {
let bytes = serde_json::to_vec(record).map_err(TicketStoreError::Encode)?;
match self
.kv
.cas(RESUME_TICKET_BUCKET, token, Bytes::from(bytes), None)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(TicketStoreError::Backend(err)),
}
}
pub async fn peek(
&self,
token: &str,
) -> Result<Option<ResumeTicketRecord>, TicketStoreError> {
let entry = self
.kv
.get(RESUME_TICKET_BUCKET, token)
.await
.map_err(TicketStoreError::Backend)?;
let Some(entry) = entry else {
return Ok(None);
};
if entry.value.as_ref() == CLAIMED_TOMBSTONE {
return Ok(None);
}
let record = serde_json::from_slice::<ResumeTicketRecord>(&entry.value)
.map_err(TicketStoreError::Decode)?;
Ok(Some(record))
}
pub async fn claim(
&self,
token: &str,
) -> Result<Option<ResumeTicketRecord>, TicketStoreError> {
let entry = self
.kv
.get(RESUME_TICKET_BUCKET, token)
.await
.map_err(TicketStoreError::Backend)?;
let Some(entry) = entry else {
return Ok(None);
};
if entry.value.as_ref() == CLAIMED_TOMBSTONE {
return Ok(None);
}
let record = serde_json::from_slice::<ResumeTicketRecord>(&entry.value)
.map_err(TicketStoreError::Decode)?;
match self
.kv
.cas(
RESUME_TICKET_BUCKET,
token,
Bytes::from_static(CLAIMED_TOMBSTONE),
Some(entry.revision),
)
.await
{
Ok(_) => {}
Err(BusError::CasConflict { .. }) => return Ok(None),
Err(err) => return Err(TicketStoreError::Backend(err)),
}
if let Err(err) = self.kv.delete(RESUME_TICKET_BUCKET, token).await {
tracing::warn!(
target: "klieo.mcp.resume_ticket",
error = %err,
"best-effort delete after ticket claim failed; tombstone remains",
);
}
Ok(Some(record))
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum TicketStoreError {
#[error("resume-ticket encode failed")]
Encode(#[source] serde_json::Error),
#[error("resume-ticket decode failed")]
Decode(#[source] serde_json::Error),
#[error("resume-ticket KV failure")]
Backend(#[source] BusError),
}
#[cfg(test)]
mod tests {
use super::*;
use klieo_core::test_utils::fake_kv;
fn record_for(principal: &str) -> ResumeTicketRecord {
let cp_json = serde_json::json!({
"run_id": klieo_core::ids::RunId::new(),
"step_index": 1,
"thread_id": "t-test",
"messages": [],
"pending_tool_calls": null,
"created_at": "2026-06-18T00:00:00Z",
});
let checkpoint = serde_json::from_value(cp_json).unwrap();
ResumeTicketRecord {
principal: principal.to_string(),
workflow_name: "wf-test".into(),
checkpoint,
created_at: Utc::now(),
}
}
#[test]
fn mint_token_yields_256_bits_of_url_safe_base64() {
let token = ResumeTicketStore::mint_token();
let decoded = URL_SAFE_NO_PAD.decode(token.as_bytes()).unwrap();
assert_eq!(
decoded.len(),
TOKEN_ENTROPY_BYTES,
"minted token must decode to exactly {TOKEN_ENTROPY_BYTES} bytes"
);
let again = ResumeTicketStore::mint_token();
assert_ne!(token, again, "two mints must not collide");
}
#[tokio::test]
async fn persist_then_peek_returns_original_record() {
let store = ResumeTicketStore::new(fake_kv());
let token = ResumeTicketStore::mint_token();
let record = record_for("alice@x");
store.persist(&token, &record).await.unwrap();
let back = store.peek(&token).await.unwrap().expect("ticket present");
assert_eq!(back.principal, "alice@x");
assert_eq!(back.workflow_name, "wf-test");
}
#[tokio::test]
async fn peek_unknown_token_is_none() {
let store = ResumeTicketStore::new(fake_kv());
let back = store.peek("never-persisted").await.unwrap();
assert!(back.is_none());
}
#[tokio::test]
async fn claim_consumes_ticket_exactly_once() {
let store = ResumeTicketStore::new(fake_kv());
let token = ResumeTicketStore::mint_token();
store.persist(&token, &record_for("alice@x")).await.unwrap();
let first = store.claim(&token).await.unwrap();
assert!(
first.is_some(),
"first claim must surface the persisted record"
);
let second = store.claim(&token).await.unwrap();
assert!(
second.is_none(),
"second claim of the same token must return None — one-shot"
);
let peek = store.peek(&token).await.unwrap();
assert!(
peek.is_none(),
"a consumed ticket peeks as absent so resume cannot re-authz against it"
);
}
#[tokio::test]
async fn concurrent_claim_yields_exactly_one_winner() {
let store = Arc::new(ResumeTicketStore::new(fake_kv()));
let token = ResumeTicketStore::mint_token();
store.persist(&token, &record_for("alice@x")).await.unwrap();
let claimers: Vec<_> = (0..8)
.map(|_| {
let store = store.clone();
let token = token.clone();
tokio::spawn(async move { store.claim(&token).await })
})
.collect();
let mut winners = 0usize;
for handle in claimers {
let outcome = handle.await.unwrap().unwrap();
if outcome.is_some() {
winners += 1;
}
}
assert_eq!(
winners, 1,
"exactly one concurrent claim must surface the record; got {winners}"
);
}
}