use arc_swap::ArcSwap;
use axum::{
Json,
extract::{Path, Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
fs::{self, File},
io::BufReader,
path::PathBuf,
sync::Arc,
};
use utoipa::{IntoParams, ToSchema};
use crate::AppState;
fn threads_dir() -> PathBuf {
let home = std::env::var("HOME").unwrap_or_else(|_| String::from("/tmp"));
PathBuf::from(home).join(".local/share/amp/threads")
}
fn is_valid_thread_id(id: &str) -> bool {
id.starts_with("T-")
&& id.len() > 2
&& id[2..].chars().all(|c| c.is_ascii_hexdigit() || c == '-')
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawThreadSummary {
id: String,
created: u64,
#[serde(default)]
title: Option<String>,
#[serde(default)]
messages: Vec<RawMessageStub>,
#[serde(default)]
agent_mode: Option<String>,
}
#[derive(Deserialize)]
struct RawMessageStub {
role: String,
#[serde(default)]
usage: Option<RawUsageStub>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawUsageStub {
model: Option<String>,
input_tokens: Option<u64>,
output_tokens: Option<u64>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawThread {
v: u64,
id: String,
created: u64,
#[serde(default)]
title: Option<String>,
#[serde(default)]
messages: Vec<RawMessage>,
#[serde(default)]
agent_mode: Option<String>,
#[serde(default)]
relationships: Vec<RawRelationship>,
#[serde(default)]
env: Option<Value>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawMessage {
role: String,
message_id: u64,
#[serde(default)]
content: Vec<Value>,
#[serde(default)]
usage: Option<RawUsage>,
#[serde(default)]
state: Option<RawMessageState>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawUsage {
#[serde(default)]
model: Option<String>,
#[serde(default)]
input_tokens: Option<u64>,
#[serde(default)]
output_tokens: Option<u64>,
#[serde(default)]
cache_creation_input_tokens: Option<u64>,
#[serde(default)]
cache_read_input_tokens: Option<u64>,
#[serde(default)]
total_input_tokens: Option<u64>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawMessageState {
#[serde(rename = "type")]
state_type: String,
#[serde(default)]
stop_reason: Option<String>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawRelationship {
#[serde(rename = "threadID")]
thread_id: String,
#[serde(rename = "type")]
rel_type: String,
#[serde(default)]
role: Option<String>,
}
#[derive(Serialize, ToSchema)]
pub struct AmpThreadListResponse {
pub threads: Vec<AmpThreadSummary>,
pub total: usize,
}
#[derive(Serialize, ToSchema)]
pub struct AmpThreadSummary {
pub id: String,
pub created: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
pub message_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_input_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_output_tokens: Option<u64>,
pub file_size_bytes: u64,
}
#[derive(Serialize, ToSchema)]
pub struct AmpThreadDetail {
pub id: String,
pub v: u64,
pub created: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_mode: Option<String>,
pub messages: Vec<AmpMessage>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub relationships: Vec<AmpRelationship>,
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<Value>,
}
#[derive(Serialize, ToSchema)]
pub struct AmpMessage {
pub role: String,
pub message_id: u64,
pub content: Vec<AmpContentBlock>,
#[serde(skip_serializing_if = "Option::is_none")]
pub usage: Option<AmpUsage>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<AmpMessageState>,
}
#[derive(Serialize, ToSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AmpContentBlock {
Text {
text: String,
},
Thinking {
thinking: String,
},
ToolUse {
id: String,
name: String,
input: Value,
},
ToolResult {
tool_use_id: String,
run: AmpToolRun,
},
Unknown {
#[serde(skip_serializing_if = "Option::is_none")]
original_type: Option<String>,
},
}
#[derive(Serialize, ToSchema)]
pub struct AmpToolRun {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<Value>,
}
#[derive(Serialize, ToSchema)]
pub struct AmpUsage {
pub model: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub input_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_creation_input_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_read_input_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_input_tokens: Option<u64>,
}
#[derive(Serialize, ToSchema)]
pub struct AmpMessageState {
pub state_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_reason: Option<String>,
}
#[derive(Serialize, ToSchema)]
pub struct AmpRelationship {
pub thread_id: String,
pub rel_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub role: Option<String>,
}
#[derive(Deserialize, IntoParams, ToSchema)]
pub struct AmpThreadListQuery {
#[serde(default = "default_limit")]
pub limit: usize,
#[serde(default)]
pub offset: usize,
#[serde(default = "default_true")]
pub has_messages: Option<bool>,
}
const fn default_limit() -> usize {
50
}
#[allow(clippy::unnecessary_wraps)] const fn default_true() -> Option<bool> {
Some(true)
}
fn parse_summary(path: &std::path::Path) -> Option<AmpThreadSummary> {
let file = File::open(path).ok()?;
let file_size = file.metadata().ok()?.len();
let raw: RawThreadSummary = serde_json::from_reader(BufReader::new(file)).ok()?;
let mut last_model: Option<String> = None;
let mut sum_input: u64 = 0;
let mut sum_output: u64 = 0;
let mut has_usage = false;
for msg in &raw.messages {
if msg.role == "assistant"
&& let Some(u) = &msg.usage
{
if let Some(m) = &u.model {
last_model = Some(m.clone());
}
sum_input += u.input_tokens.unwrap_or(0);
sum_output += u.output_tokens.unwrap_or(0);
has_usage = true;
}
}
Some(AmpThreadSummary {
message_count: raw.messages.len(),
id: raw.id,
created: raw.created,
title: raw.title,
agent_mode: raw.agent_mode,
last_model,
total_input_tokens: has_usage.then_some(sum_input),
total_output_tokens: has_usage.then_some(sum_output),
file_size_bytes: file_size,
})
}
fn convert_content_block(v: &Value) -> AmpContentBlock {
let block_type = v.get("type").and_then(Value::as_str).unwrap_or("");
match block_type {
"text" => AmpContentBlock::Text {
text: v
.get("text")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
},
"thinking" | "redacted_thinking" => AmpContentBlock::Thinking {
thinking: v
.get("thinking")
.or_else(|| v.get("data"))
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
},
"tool_use" => AmpContentBlock::ToolUse {
id: v
.get("id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
name: v
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
input: v.get("input").cloned().unwrap_or(Value::Null),
},
"tool_result" => {
let run_val = v.get("run");
AmpContentBlock::ToolResult {
tool_use_id: v
.get("toolUseID")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
run: AmpToolRun {
status: run_val
.and_then(|r| r.get("status"))
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string(),
result: run_val.and_then(|r| r.get("result")).cloned(),
error: run_val.and_then(|r| r.get("error")).cloned(),
},
}
}
_ => AmpContentBlock::Unknown {
original_type: Some(block_type.to_string()),
},
}
}
fn convert_message(raw: RawMessage) -> AmpMessage {
AmpMessage {
role: raw.role,
message_id: raw.message_id,
content: raw.content.iter().map(convert_content_block).collect(),
usage: raw.usage.map(|u| AmpUsage {
model: u.model.unwrap_or_default(),
input_tokens: u.input_tokens,
output_tokens: u.output_tokens,
cache_creation_input_tokens: u.cache_creation_input_tokens,
cache_read_input_tokens: u.cache_read_input_tokens,
total_input_tokens: u.total_input_tokens,
}),
state: raw.state.map(|s| AmpMessageState {
state_type: s.state_type,
stop_reason: s.stop_reason,
}),
}
}
fn parse_detail(path: &std::path::Path) -> Result<AmpThreadDetail, String> {
let file = File::open(path).map_err(|e| e.to_string())?;
let raw: RawThread =
serde_json::from_reader(BufReader::new(file)).map_err(|e| e.to_string())?;
Ok(AmpThreadDetail {
id: raw.id,
v: raw.v,
created: raw.created,
title: raw.title,
agent_mode: raw.agent_mode,
messages: raw.messages.into_iter().map(convert_message).collect(),
relationships: raw
.relationships
.into_iter()
.map(|r| AmpRelationship {
thread_id: r.thread_id,
rel_type: r.rel_type,
role: r.role,
})
.collect(),
env: raw.env,
})
}
pub struct AmpThreadIndex {
summaries: ArcSwap<Vec<AmpThreadSummary>>,
}
impl AmpThreadIndex {
#[must_use]
pub fn build() -> Self {
let summaries = scan_all_summaries();
Self {
summaries: ArcSwap::from_pointee(summaries),
}
}
#[must_use]
pub fn empty() -> Self {
Self {
summaries: ArcSwap::from_pointee(Vec::new()),
}
}
pub fn list(&self) -> arc_swap::Guard<Arc<Vec<AmpThreadSummary>>> {
self.summaries.load()
}
pub fn watch(self: &Arc<Self>) {
use notify::{RecursiveMode, Watcher as _};
let index = Arc::clone(self);
let dir = threads_dir();
tokio::task::spawn_blocking(move || {
if !dir.is_dir() {
tracing::debug!(path = %dir.display(), "amp threads dir not found, skipping watch");
return;
}
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher =
notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
if let Ok(ev) = res {
let dominated_by_json = ev.paths.iter().any(|p| {
p.extension()
.is_some_and(|e| e.eq_ignore_ascii_case("json"))
});
if dominated_by_json {
let _ = tx.send(());
}
}
})
.expect("failed to create file watcher");
watcher
.watch(&dir, RecursiveMode::NonRecursive)
.expect("failed to watch amp threads directory");
tracing::info!(path = %dir.display(), "watching amp threads directory");
while rx.recv().is_ok() {
while rx.try_recv().is_ok() {}
std::thread::sleep(std::time::Duration::from_millis(500));
while rx.try_recv().is_ok() {}
let new = scan_all_summaries();
tracing::debug!(count = new.len(), "amp thread index rebuilt");
index.summaries.store(Arc::new(new));
}
});
}
}
fn scan_all_summaries() -> Vec<AmpThreadSummary> {
let dir = threads_dir();
let Ok(entries) = fs::read_dir(&dir) else {
return Vec::new();
};
let mut summaries: Vec<AmpThreadSummary> = entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().to_string_lossy().to_string();
if !name.starts_with("T-")
|| !std::path::Path::new(&name)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("json"))
{
return None;
}
parse_summary(&entry.path())
})
.collect();
summaries.sort_unstable_by(|a, b| b.created.cmp(&a.created));
summaries
}
#[utoipa::path(
get,
path = "/v0/management/amp/threads",
params(AmpThreadListQuery),
responses((status = 200, body = AmpThreadListResponse)),
tag = "management"
)]
pub async fn list_threads(
State(state): State<Arc<AppState>>,
Query(q): Query<AmpThreadListQuery>,
) -> Json<AmpThreadListResponse> {
let all = state.amp_threads.list();
let filtered: Vec<&AmpThreadSummary> = all
.iter()
.filter(|s| {
if let Some(want) = q.has_messages {
(s.message_count > 0) == want
} else {
true
}
})
.collect();
let total = filtered.len();
let limit = q.limit.min(200);
let offset = q.offset.min(total);
let threads: Vec<AmpThreadSummary> = filtered
.into_iter()
.skip(offset)
.take(limit)
.map(clone_summary)
.collect();
Json(AmpThreadListResponse { threads, total })
}
fn clone_summary(s: &AmpThreadSummary) -> AmpThreadSummary {
AmpThreadSummary {
id: s.id.clone(),
created: s.created,
title: s.title.clone(),
message_count: s.message_count,
agent_mode: s.agent_mode.clone(),
last_model: s.last_model.clone(),
total_input_tokens: s.total_input_tokens,
total_output_tokens: s.total_output_tokens,
file_size_bytes: s.file_size_bytes,
}
}
#[utoipa::path(
get,
path = "/v0/management/amp/threads/{id}",
params(("id" = String, Path, description = "Thread ID (e.g. T-019d38dd-45f9-7617-8e7f-03b730ba197a)")),
responses(
(status = 200, body = AmpThreadDetail),
(status = 400, description = "Invalid thread ID format"),
(status = 404, description = "Thread not found"),
),
tag = "management"
)]
pub async fn get_thread(Path(id): Path<String>) -> Response {
if !is_valid_thread_id(&id) {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": {"message": "invalid thread ID format", "type": "invalid_request_error"}
})),
)
.into_response();
}
let path = threads_dir().join(format!("{id}.json"));
let result = tokio::task::spawn_blocking(move || {
if !path.exists() {
return Err(StatusCode::NOT_FOUND);
}
parse_detail(&path).map_err(|e| {
tracing::error!(error = %e, "failed to parse amp thread");
StatusCode::INTERNAL_SERVER_ERROR
})
})
.await
.unwrap_or(Err(StatusCode::INTERNAL_SERVER_ERROR));
match result {
Ok(detail) => Json(detail).into_response(),
Err(StatusCode::NOT_FOUND) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": {"message": "thread not found", "type": "not_found"}
})),
)
.into_response(),
Err(status) => (
status,
Json(serde_json::json!({
"error": {"message": "failed to parse thread", "type": "server_error"}
})),
)
.into_response(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn valid_thread_ids() {
assert!(is_valid_thread_id("T-019d38dd-45f9-7617-8e7f-03b730ba197a"));
assert!(is_valid_thread_id("T-fc68e9f5-9621-4ee2-b8d9-d954ba656de4"));
assert!(is_valid_thread_id("T-abcdef0123456789"));
}
#[test]
fn invalid_thread_ids() {
assert!(!is_valid_thread_id(""));
assert!(!is_valid_thread_id("T-"));
assert!(!is_valid_thread_id("../etc/passwd"));
assert!(!is_valid_thread_id("T-../../foo"));
assert!(!is_valid_thread_id("T-abc def"));
assert!(!is_valid_thread_id("not-a-thread"));
}
#[test]
fn parse_empty_thread_json() {
let json_str =
r#"{"v":0,"id":"T-test-1234","created":1711728000000,"messages":[],"nextMessageId":0}"#;
let raw: RawThreadSummary = serde_json::from_str(json_str).unwrap();
assert_eq!(raw.id, "T-test-1234");
assert!(raw.messages.is_empty());
assert!(raw.title.is_none());
}
#[test]
fn parse_thread_with_messages() {
let json_str = json!({
"v": 5,
"id": "T-test-5678",
"created": 1_711_728_000_000_u64,
"messages": [
{
"role": "user",
"messageId": 0,
"content": [{"type": "text", "text": "hello"}]
},
{
"role": "assistant",
"messageId": 1,
"content": [
{"type": "thinking", "thinking": "hmm", "signature": "sig"},
{"type": "tool_use", "id": "toolu_01", "name": "Bash", "input": {"cmd": "ls"}, "complete": true},
],
"usage": {
"model": "claude-opus-4-6",
"inputTokens": 100,
"outputTokens": 50,
"cacheCreationInputTokens": 10,
"cacheReadInputTokens": 5,
"totalInputTokens": 115
},
"state": {"type": "complete", "stopReason": "tool_use"}
},
{
"role": "user",
"messageId": 2,
"content": [{
"type": "tool_result",
"toolUseID": "toolu_01",
"run": {"status": "done", "result": {"output": "file.txt", "exitCode": 0}}
}]
}
],
"agentMode": "smart",
"title": "Test thread",
"nextMessageId": 3
});
let raw: RawThread = serde_json::from_value(json_str).unwrap();
assert_eq!(raw.messages.len(), 3);
assert_eq!(raw.agent_mode.as_deref(), Some("smart"));
let detail = AmpThreadDetail {
id: raw.id.clone(),
v: raw.v,
created: raw.created,
title: raw.title.clone(),
agent_mode: raw.agent_mode.clone(),
messages: raw.messages.into_iter().map(convert_message).collect(),
relationships: Vec::new(),
env: None,
};
assert_eq!(detail.messages.len(), 3);
assert_eq!(detail.messages[0].role, "user");
assert_eq!(detail.messages[1].role, "assistant");
assert!(detail.messages[1].usage.is_some());
let usage = detail.messages[1].usage.as_ref().unwrap();
assert_eq!(usage.model, "claude-opus-4-6");
assert_eq!(usage.input_tokens, Some(100));
assert_eq!(usage.output_tokens, Some(50));
assert!(matches!(
&detail.messages[1].content[0],
AmpContentBlock::Thinking { .. }
));
assert!(
matches!(&detail.messages[1].content[1], AmpContentBlock::ToolUse { name, .. } if name == "Bash")
);
assert!(matches!(
&detail.messages[2].content[0],
AmpContentBlock::ToolResult { .. }
));
}
#[test]
fn convert_unknown_content_block() {
let block = json!({"type": "some_future_type", "data": 42});
let result = convert_content_block(&block);
assert!(
matches!(result, AmpContentBlock::Unknown { original_type: Some(t) } if t == "some_future_type")
);
}
#[test]
fn summary_deserialization_skips_heavy_fields() {
let json_str = json!({
"v": 100,
"id": "T-skip-test",
"created": 1_711_728_000_000_u64,
"messages": [{
"role": "user",
"messageId": 0,
"content": [{"type": "text", "text": "this should be skipped by summary parser"}],
"userState": {"activeEditor": "foo.rs"},
"fileMentions": {"files": []}
}],
"nextMessageId": 1,
"env": {"initial": {"platform": {"os": "darwin"}}},
"meta": {"traces": []},
"~debug": {"something": true}
});
let raw: RawThreadSummary = serde_json::from_value(json_str).unwrap();
assert_eq!(raw.id, "T-skip-test");
assert_eq!(raw.messages.len(), 1);
}
}