1use arc_swap::ArcSwap;
9use axum::{
10 Json,
11 extract::{Path, Query, State},
12 http::StatusCode,
13 response::{IntoResponse, Response},
14};
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::{
18 fs::{self, File},
19 io::BufReader,
20 path::PathBuf,
21 sync::Arc,
22};
23use utoipa::{IntoParams, ToSchema};
24
25use crate::AppState;
26
27fn threads_dir() -> PathBuf {
34 let home = std::env::var("HOME").unwrap_or_else(|_| String::from("/tmp"));
35 PathBuf::from(home).join(".local/share/amp/threads")
36}
37
38fn is_valid_thread_id(id: &str) -> bool {
41 id.starts_with("T-")
42 && id.len() > 2
43 && id[2..].chars().all(|c| c.is_ascii_hexdigit() || c == '-')
44}
45
46#[derive(Deserialize)]
49#[serde(rename_all = "camelCase")]
50struct RawThreadSummary {
51 id: String,
52 created: u64,
53 #[serde(default)]
54 title: Option<String>,
55 #[serde(default)]
56 messages: Vec<RawMessageStub>,
57 #[serde(default)]
58 agent_mode: Option<String>,
59}
60
61#[derive(Deserialize)]
62struct RawMessageStub {
63 role: String,
64 #[serde(default)]
65 usage: Option<RawUsageStub>,
66}
67
68#[derive(Deserialize)]
69#[serde(rename_all = "camelCase")]
70struct RawUsageStub {
71 model: Option<String>,
72 input_tokens: Option<u64>,
73 output_tokens: Option<u64>,
74}
75
76#[derive(Deserialize)]
77#[serde(rename_all = "camelCase")]
78struct RawThread {
79 v: u64,
80 id: String,
81 created: u64,
82 #[serde(default)]
83 title: Option<String>,
84 #[serde(default)]
85 messages: Vec<RawMessage>,
86 #[serde(default)]
87 agent_mode: Option<String>,
88 #[serde(default)]
89 relationships: Vec<RawRelationship>,
90 #[serde(default)]
91 env: Option<Value>,
92}
93
94#[derive(Deserialize)]
95#[serde(rename_all = "camelCase")]
96struct RawMessage {
97 role: String,
98 message_id: u64,
99 #[serde(default)]
100 content: Vec<Value>,
101 #[serde(default)]
102 usage: Option<RawUsage>,
103 #[serde(default)]
104 state: Option<RawMessageState>,
105}
106
107#[derive(Deserialize)]
108#[serde(rename_all = "camelCase")]
109struct RawUsage {
110 #[serde(default)]
111 model: Option<String>,
112 #[serde(default)]
113 input_tokens: Option<u64>,
114 #[serde(default)]
115 output_tokens: Option<u64>,
116 #[serde(default)]
117 cache_creation_input_tokens: Option<u64>,
118 #[serde(default)]
119 cache_read_input_tokens: Option<u64>,
120 #[serde(default)]
121 total_input_tokens: Option<u64>,
122}
123
124#[derive(Deserialize)]
125#[serde(rename_all = "camelCase")]
126struct RawMessageState {
127 #[serde(rename = "type")]
128 state_type: String,
129 #[serde(default)]
130 stop_reason: Option<String>,
131}
132
133#[derive(Deserialize)]
134#[serde(rename_all = "camelCase")]
135struct RawRelationship {
136 #[serde(rename = "threadID")]
137 thread_id: String,
138 #[serde(rename = "type")]
139 rel_type: String,
140 #[serde(default)]
141 role: Option<String>,
142}
143
144#[derive(Serialize, ToSchema)]
148pub struct AmpThreadListResponse {
149 pub threads: Vec<AmpThreadSummary>,
151 pub total: usize,
153}
154
155#[derive(Serialize, ToSchema)]
157pub struct AmpThreadSummary {
158 pub id: String,
159 pub created: u64,
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub title: Option<String>,
163 pub message_count: usize,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub agent_mode: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
169 pub last_model: Option<String>,
170 #[serde(skip_serializing_if = "Option::is_none")]
172 pub total_input_tokens: Option<u64>,
173 #[serde(skip_serializing_if = "Option::is_none")]
175 pub total_output_tokens: Option<u64>,
176 pub file_size_bytes: u64,
178}
179
180#[derive(Serialize, ToSchema)]
182pub struct AmpThreadDetail {
183 pub id: String,
184 pub v: u64,
186 pub created: u64,
188 #[serde(skip_serializing_if = "Option::is_none")]
189 pub title: Option<String>,
190 #[serde(skip_serializing_if = "Option::is_none")]
191 pub agent_mode: Option<String>,
192 pub messages: Vec<AmpMessage>,
193 #[serde(skip_serializing_if = "Vec::is_empty")]
194 pub relationships: Vec<AmpRelationship>,
195 #[serde(skip_serializing_if = "Option::is_none")]
197 pub env: Option<Value>,
198}
199
200#[derive(Serialize, ToSchema)]
202pub struct AmpMessage {
203 pub role: String,
205 pub message_id: u64,
206 pub content: Vec<AmpContentBlock>,
207 #[serde(skip_serializing_if = "Option::is_none")]
208 pub usage: Option<AmpUsage>,
209 #[serde(skip_serializing_if = "Option::is_none")]
210 pub state: Option<AmpMessageState>,
211}
212
213#[derive(Serialize, ToSchema)]
215#[serde(tag = "type", rename_all = "snake_case")]
216pub enum AmpContentBlock {
217 Text {
218 text: String,
219 },
220 Thinking {
221 thinking: String,
222 },
223 ToolUse {
224 id: String,
225 name: String,
226 input: Value,
227 },
228 ToolResult {
229 tool_use_id: String,
230 run: AmpToolRun,
231 },
232 Unknown {
234 #[serde(skip_serializing_if = "Option::is_none")]
235 original_type: Option<String>,
236 },
237}
238
239#[derive(Serialize, ToSchema)]
241pub struct AmpToolRun {
242 pub status: String,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 pub result: Option<Value>,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 pub error: Option<Value>,
248}
249
250#[derive(Serialize, ToSchema)]
252pub struct AmpUsage {
253 pub model: String,
254 #[serde(skip_serializing_if = "Option::is_none")]
255 pub input_tokens: Option<u64>,
256 #[serde(skip_serializing_if = "Option::is_none")]
257 pub output_tokens: Option<u64>,
258 #[serde(skip_serializing_if = "Option::is_none")]
259 pub cache_creation_input_tokens: Option<u64>,
260 #[serde(skip_serializing_if = "Option::is_none")]
261 pub cache_read_input_tokens: Option<u64>,
262 #[serde(skip_serializing_if = "Option::is_none")]
263 pub total_input_tokens: Option<u64>,
264}
265
266#[derive(Serialize, ToSchema)]
268pub struct AmpMessageState {
269 pub state_type: String,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 pub stop_reason: Option<String>,
272}
273
274#[derive(Serialize, ToSchema)]
276pub struct AmpRelationship {
277 pub thread_id: String,
278 pub rel_type: String,
280 #[serde(skip_serializing_if = "Option::is_none")]
282 pub role: Option<String>,
283}
284
285#[derive(Deserialize, IntoParams, ToSchema)]
287pub struct AmpThreadListQuery {
288 #[serde(default = "default_limit")]
290 pub limit: usize,
291 #[serde(default)]
293 pub offset: usize,
294 #[serde(default = "default_true")]
296 pub has_messages: Option<bool>,
297}
298
299const fn default_limit() -> usize {
300 50
301}
302
303#[allow(clippy::unnecessary_wraps)] const fn default_true() -> Option<bool> {
305 Some(true)
306}
307
308fn parse_summary(path: &std::path::Path) -> Option<AmpThreadSummary> {
311 let file = File::open(path).ok()?;
312 let file_size = file.metadata().ok()?.len();
313 let raw: RawThreadSummary = serde_json::from_reader(BufReader::new(file)).ok()?;
314
315 let mut last_model: Option<String> = None;
316 let mut sum_input: u64 = 0;
317 let mut sum_output: u64 = 0;
318 let mut has_usage = false;
319
320 for msg in &raw.messages {
321 if msg.role == "assistant"
322 && let Some(u) = &msg.usage
323 {
324 if let Some(m) = &u.model {
325 last_model = Some(m.clone());
326 }
327 sum_input += u.input_tokens.unwrap_or(0);
328 sum_output += u.output_tokens.unwrap_or(0);
329 has_usage = true;
330 }
331 }
332
333 Some(AmpThreadSummary {
334 message_count: raw.messages.len(),
335 id: raw.id,
336 created: raw.created,
337 title: raw.title,
338 agent_mode: raw.agent_mode,
339 last_model,
340 total_input_tokens: has_usage.then_some(sum_input),
341 total_output_tokens: has_usage.then_some(sum_output),
342 file_size_bytes: file_size,
343 })
344}
345
346fn convert_content_block(v: &Value) -> AmpContentBlock {
348 let block_type = v.get("type").and_then(Value::as_str).unwrap_or("");
349 match block_type {
350 "text" => AmpContentBlock::Text {
351 text: v
352 .get("text")
353 .and_then(Value::as_str)
354 .unwrap_or("")
355 .to_string(),
356 },
357 "thinking" | "redacted_thinking" => AmpContentBlock::Thinking {
358 thinking: v
359 .get("thinking")
360 .or_else(|| v.get("data"))
361 .and_then(Value::as_str)
362 .unwrap_or("")
363 .to_string(),
364 },
365 "tool_use" => AmpContentBlock::ToolUse {
366 id: v
367 .get("id")
368 .and_then(Value::as_str)
369 .unwrap_or("")
370 .to_string(),
371 name: v
372 .get("name")
373 .and_then(Value::as_str)
374 .unwrap_or("")
375 .to_string(),
376 input: v.get("input").cloned().unwrap_or(Value::Null),
377 },
378 "tool_result" => {
379 let run_val = v.get("run");
380 AmpContentBlock::ToolResult {
381 tool_use_id: v
382 .get("toolUseID")
383 .and_then(Value::as_str)
384 .unwrap_or("")
385 .to_string(),
386 run: AmpToolRun {
387 status: run_val
388 .and_then(|r| r.get("status"))
389 .and_then(Value::as_str)
390 .unwrap_or("unknown")
391 .to_string(),
392 result: run_val.and_then(|r| r.get("result")).cloned(),
393 error: run_val.and_then(|r| r.get("error")).cloned(),
394 },
395 }
396 }
397 _ => AmpContentBlock::Unknown {
398 original_type: Some(block_type.to_string()),
399 },
400 }
401}
402
403fn convert_message(raw: RawMessage) -> AmpMessage {
404 AmpMessage {
405 role: raw.role,
406 message_id: raw.message_id,
407 content: raw.content.iter().map(convert_content_block).collect(),
408 usage: raw.usage.map(|u| AmpUsage {
409 model: u.model.unwrap_or_default(),
410 input_tokens: u.input_tokens,
411 output_tokens: u.output_tokens,
412 cache_creation_input_tokens: u.cache_creation_input_tokens,
413 cache_read_input_tokens: u.cache_read_input_tokens,
414 total_input_tokens: u.total_input_tokens,
415 }),
416 state: raw.state.map(|s| AmpMessageState {
417 state_type: s.state_type,
418 stop_reason: s.stop_reason,
419 }),
420 }
421}
422
423fn parse_detail(path: &std::path::Path) -> Result<AmpThreadDetail, String> {
424 let file = File::open(path).map_err(|e| e.to_string())?;
425 let raw: RawThread =
426 serde_json::from_reader(BufReader::new(file)).map_err(|e| e.to_string())?;
427
428 Ok(AmpThreadDetail {
429 id: raw.id,
430 v: raw.v,
431 created: raw.created,
432 title: raw.title,
433 agent_mode: raw.agent_mode,
434 messages: raw.messages.into_iter().map(convert_message).collect(),
435 relationships: raw
436 .relationships
437 .into_iter()
438 .map(|r| AmpRelationship {
439 thread_id: r.thread_id,
440 rel_type: r.rel_type,
441 role: r.role,
442 })
443 .collect(),
444 env: raw.env,
445 })
446}
447
448pub struct AmpThreadIndex {
457 summaries: ArcSwap<Vec<AmpThreadSummary>>,
458}
459
460impl AmpThreadIndex {
461 #[must_use]
470 pub fn build() -> Self {
471 let summaries = scan_all_summaries();
472 Self {
473 summaries: ArcSwap::from_pointee(summaries),
474 }
475 }
476
477 #[must_use]
479 pub fn empty() -> Self {
480 Self {
481 summaries: ArcSwap::from_pointee(Vec::new()),
482 }
483 }
484
485 pub fn list(&self) -> arc_swap::Guard<Arc<Vec<AmpThreadSummary>>> {
487 self.summaries.load()
488 }
489
490 pub fn watch(self: &Arc<Self>) {
501 use notify::{RecursiveMode, Watcher as _};
502
503 let index = Arc::clone(self);
504 let dir = threads_dir();
505
506 tokio::task::spawn_blocking(move || {
507 if !dir.is_dir() {
508 tracing::debug!(path = %dir.display(), "amp threads dir not found, skipping watch");
509 return;
510 }
511
512 let (tx, rx) = std::sync::mpsc::channel();
513
514 let mut watcher =
515 notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
516 if let Ok(ev) = res {
517 let dominated_by_json = ev.paths.iter().any(|p| {
519 p.extension()
520 .is_some_and(|e| e.eq_ignore_ascii_case("json"))
521 });
522 if dominated_by_json {
523 let _ = tx.send(());
524 }
525 }
526 })
527 .expect("failed to create file watcher");
528
529 watcher
530 .watch(&dir, RecursiveMode::NonRecursive)
531 .expect("failed to watch amp threads directory");
532
533 tracing::info!(path = %dir.display(), "watching amp threads directory");
534
535 while rx.recv().is_ok() {
537 while rx.try_recv().is_ok() {}
539
540 std::thread::sleep(std::time::Duration::from_millis(500));
542
543 while rx.try_recv().is_ok() {}
545
546 let new = scan_all_summaries();
547 tracing::debug!(count = new.len(), "amp thread index rebuilt");
548 index.summaries.store(Arc::new(new));
549 }
550 });
551 }
552}
553
554fn scan_all_summaries() -> Vec<AmpThreadSummary> {
557 let dir = threads_dir();
558 let Ok(entries) = fs::read_dir(&dir) else {
559 return Vec::new();
560 };
561
562 let mut summaries: Vec<AmpThreadSummary> = entries
563 .filter_map(|entry| {
564 let entry = entry.ok()?;
565 let name = entry.file_name().to_string_lossy().to_string();
566 if !name.starts_with("T-")
567 || !std::path::Path::new(&name)
568 .extension()
569 .is_some_and(|ext| ext.eq_ignore_ascii_case("json"))
570 {
571 return None;
572 }
573 parse_summary(&entry.path())
574 })
575 .collect();
576
577 summaries.sort_unstable_by(|a, b| b.created.cmp(&a.created));
578 summaries
579}
580
581#[utoipa::path(
585 get,
586 path = "/v0/management/amp/threads",
587 params(AmpThreadListQuery),
588 responses((status = 200, body = AmpThreadListResponse)),
589 tag = "management"
590)]
591pub async fn list_threads(
592 State(state): State<Arc<AppState>>,
593 Query(q): Query<AmpThreadListQuery>,
594) -> Json<AmpThreadListResponse> {
595 let all = state.amp_threads.list();
596
597 let filtered: Vec<&AmpThreadSummary> = all
599 .iter()
600 .filter(|s| {
601 if let Some(want) = q.has_messages {
602 (s.message_count > 0) == want
603 } else {
604 true
605 }
606 })
607 .collect();
608
609 let total = filtered.len();
610 let limit = q.limit.min(200);
611 let offset = q.offset.min(total);
612
613 let threads: Vec<AmpThreadSummary> = filtered
614 .into_iter()
615 .skip(offset)
616 .take(limit)
617 .map(clone_summary)
618 .collect();
619
620 Json(AmpThreadListResponse { threads, total })
621}
622
623fn clone_summary(s: &AmpThreadSummary) -> AmpThreadSummary {
625 AmpThreadSummary {
626 id: s.id.clone(),
627 created: s.created,
628 title: s.title.clone(),
629 message_count: s.message_count,
630 agent_mode: s.agent_mode.clone(),
631 last_model: s.last_model.clone(),
632 total_input_tokens: s.total_input_tokens,
633 total_output_tokens: s.total_output_tokens,
634 file_size_bytes: s.file_size_bytes,
635 }
636}
637
638#[utoipa::path(
640 get,
641 path = "/v0/management/amp/threads/{id}",
642 params(("id" = String, Path, description = "Thread ID (e.g. T-019d38dd-45f9-7617-8e7f-03b730ba197a)")),
643 responses(
644 (status = 200, body = AmpThreadDetail),
645 (status = 400, description = "Invalid thread ID format"),
646 (status = 404, description = "Thread not found"),
647 ),
648 tag = "management"
649)]
650pub async fn get_thread(Path(id): Path<String>) -> Response {
651 if !is_valid_thread_id(&id) {
652 return (
653 StatusCode::BAD_REQUEST,
654 Json(serde_json::json!({
655 "error": {"message": "invalid thread ID format", "type": "invalid_request_error"}
656 })),
657 )
658 .into_response();
659 }
660
661 let path = threads_dir().join(format!("{id}.json"));
662
663 let result = tokio::task::spawn_blocking(move || {
664 if !path.exists() {
665 return Err(StatusCode::NOT_FOUND);
666 }
667 parse_detail(&path).map_err(|e| {
668 tracing::error!(error = %e, "failed to parse amp thread");
669 StatusCode::INTERNAL_SERVER_ERROR
670 })
671 })
672 .await
673 .unwrap_or(Err(StatusCode::INTERNAL_SERVER_ERROR));
674
675 match result {
676 Ok(detail) => Json(detail).into_response(),
677 Err(StatusCode::NOT_FOUND) => (
678 StatusCode::NOT_FOUND,
679 Json(serde_json::json!({
680 "error": {"message": "thread not found", "type": "not_found"}
681 })),
682 )
683 .into_response(),
684 Err(status) => (
685 status,
686 Json(serde_json::json!({
687 "error": {"message": "failed to parse thread", "type": "server_error"}
688 })),
689 )
690 .into_response(),
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use serde_json::json;
698
699 #[test]
700 fn valid_thread_ids() {
701 assert!(is_valid_thread_id("T-019d38dd-45f9-7617-8e7f-03b730ba197a"));
702 assert!(is_valid_thread_id("T-fc68e9f5-9621-4ee2-b8d9-d954ba656de4"));
703 assert!(is_valid_thread_id("T-abcdef0123456789"));
704 }
705
706 #[test]
707 fn invalid_thread_ids() {
708 assert!(!is_valid_thread_id(""));
709 assert!(!is_valid_thread_id("T-"));
710 assert!(!is_valid_thread_id("../etc/passwd"));
711 assert!(!is_valid_thread_id("T-../../foo"));
712 assert!(!is_valid_thread_id("T-abc def"));
713 assert!(!is_valid_thread_id("not-a-thread"));
714 }
715
716 #[test]
717 fn parse_empty_thread_json() {
718 let json_str =
719 r#"{"v":0,"id":"T-test-1234","created":1711728000000,"messages":[],"nextMessageId":0}"#;
720 let raw: RawThreadSummary = serde_json::from_str(json_str).unwrap();
721 assert_eq!(raw.id, "T-test-1234");
722 assert!(raw.messages.is_empty());
723 assert!(raw.title.is_none());
724 }
725
726 #[test]
727 fn parse_thread_with_messages() {
728 let json_str = json!({
729 "v": 5,
730 "id": "T-test-5678",
731 "created": 1_711_728_000_000_u64,
732 "messages": [
733 {
734 "role": "user",
735 "messageId": 0,
736 "content": [{"type": "text", "text": "hello"}]
737 },
738 {
739 "role": "assistant",
740 "messageId": 1,
741 "content": [
742 {"type": "thinking", "thinking": "hmm", "signature": "sig"},
743 {"type": "tool_use", "id": "toolu_01", "name": "Bash", "input": {"cmd": "ls"}, "complete": true},
744 ],
745 "usage": {
746 "model": "claude-opus-4-6",
747 "inputTokens": 100,
748 "outputTokens": 50,
749 "cacheCreationInputTokens": 10,
750 "cacheReadInputTokens": 5,
751 "totalInputTokens": 115
752 },
753 "state": {"type": "complete", "stopReason": "tool_use"}
754 },
755 {
756 "role": "user",
757 "messageId": 2,
758 "content": [{
759 "type": "tool_result",
760 "toolUseID": "toolu_01",
761 "run": {"status": "done", "result": {"output": "file.txt", "exitCode": 0}}
762 }]
763 }
764 ],
765 "agentMode": "smart",
766 "title": "Test thread",
767 "nextMessageId": 3
768 });
769
770 let raw: RawThread = serde_json::from_value(json_str).unwrap();
771 assert_eq!(raw.messages.len(), 3);
772 assert_eq!(raw.agent_mode.as_deref(), Some("smart"));
773
774 let detail = AmpThreadDetail {
776 id: raw.id.clone(),
777 v: raw.v,
778 created: raw.created,
779 title: raw.title.clone(),
780 agent_mode: raw.agent_mode.clone(),
781 messages: raw.messages.into_iter().map(convert_message).collect(),
782 relationships: Vec::new(),
783 env: None,
784 };
785
786 assert_eq!(detail.messages.len(), 3);
787 assert_eq!(detail.messages[0].role, "user");
788 assert_eq!(detail.messages[1].role, "assistant");
789 assert!(detail.messages[1].usage.is_some());
790
791 let usage = detail.messages[1].usage.as_ref().unwrap();
792 assert_eq!(usage.model, "claude-opus-4-6");
793 assert_eq!(usage.input_tokens, Some(100));
794 assert_eq!(usage.output_tokens, Some(50));
795
796 assert!(matches!(
798 &detail.messages[1].content[0],
799 AmpContentBlock::Thinking { .. }
800 ));
801 assert!(
802 matches!(&detail.messages[1].content[1], AmpContentBlock::ToolUse { name, .. } if name == "Bash")
803 );
804 assert!(matches!(
805 &detail.messages[2].content[0],
806 AmpContentBlock::ToolResult { .. }
807 ));
808 }
809
810 #[test]
811 fn convert_unknown_content_block() {
812 let block = json!({"type": "some_future_type", "data": 42});
813 let result = convert_content_block(&block);
814 assert!(
815 matches!(result, AmpContentBlock::Unknown { original_type: Some(t) } if t == "some_future_type")
816 );
817 }
818
819 #[test]
820 fn summary_deserialization_skips_heavy_fields() {
821 let json_str = json!({
823 "v": 100,
824 "id": "T-skip-test",
825 "created": 1_711_728_000_000_u64,
826 "messages": [{
827 "role": "user",
828 "messageId": 0,
829 "content": [{"type": "text", "text": "this should be skipped by summary parser"}],
830 "userState": {"activeEditor": "foo.rs"},
831 "fileMentions": {"files": []}
832 }],
833 "nextMessageId": 1,
834 "env": {"initial": {"platform": {"os": "darwin"}}},
835 "meta": {"traces": []},
836 "~debug": {"something": true}
837 });
838
839 let raw: RawThreadSummary = serde_json::from_value(json_str).unwrap();
840 assert_eq!(raw.id, "T-skip-test");
841 assert_eq!(raw.messages.len(), 1);
842 }
843}