tap_cli/commands/
received.rs1use crate::error::{Error, Result};
2use crate::output::{print_success, OutputFormat};
3use crate::tap_integration::TapIntegration;
4use clap::Subcommand;
5use serde::Serialize;
6
7#[derive(Subcommand, Debug)]
8pub enum ReceivedCommands {
9 List {
11 #[arg(long)]
13 agent_did: Option<String>,
14 #[arg(long, default_value = "50")]
16 limit: u32,
17 #[arg(long, default_value = "0")]
19 offset: u32,
20 },
21 Pending {
23 #[arg(long)]
25 agent_did: Option<String>,
26 #[arg(long, default_value = "50")]
28 limit: u32,
29 },
30 View {
32 id: i64,
34 #[arg(long)]
36 agent_did: Option<String>,
37 },
38}
39
40#[derive(Debug, Serialize)]
41struct ReceivedInfo {
42 id: i64,
43 message_id: Option<String>,
44 source_type: String,
45 status: String,
46 received_at: String,
47 processed_at: Option<String>,
48}
49
50#[derive(Debug, Serialize)]
51struct ReceivedListResponse {
52 messages: Vec<ReceivedInfo>,
53 total: usize,
54}
55
56#[derive(Debug, Serialize)]
57struct ReceivedViewResponse {
58 id: i64,
59 message_id: Option<String>,
60 raw_message: serde_json::Value,
61 source_type: String,
62 status: String,
63 received_at: String,
64 processed_at: Option<String>,
65}
66
67fn to_received_info(r: &tap_node::storage::models::Received) -> ReceivedInfo {
68 ReceivedInfo {
69 id: r.id,
70 message_id: r.message_id.clone(),
71 source_type: format!("{:?}", r.source_type),
72 status: format!("{:?}", r.status),
73 received_at: r.received_at.clone(),
74 processed_at: r.processed_at.clone(),
75 }
76}
77
78pub async fn handle(
79 cmd: &ReceivedCommands,
80 format: OutputFormat,
81 default_agent_did: &str,
82 tap_integration: &TapIntegration,
83) -> Result<()> {
84 match cmd {
85 ReceivedCommands::List {
86 agent_did,
87 limit,
88 offset,
89 } => {
90 let effective_did = agent_did.as_deref().unwrap_or(default_agent_did);
91 let storage = tap_integration.storage_for_agent(effective_did).await?;
92 let received = storage.list_received(*limit, *offset, None, None).await?;
93
94 let messages: Vec<ReceivedInfo> = received.iter().map(to_received_info).collect();
95
96 let response = ReceivedListResponse {
97 total: messages.len(),
98 messages,
99 };
100 print_success(format, &response);
101 Ok(())
102 }
103 ReceivedCommands::Pending { agent_did, limit } => {
104 let effective_did = agent_did.as_deref().unwrap_or(default_agent_did);
105 let storage = tap_integration.storage_for_agent(effective_did).await?;
106 let received = storage.get_pending_received(*limit).await?;
107
108 let messages: Vec<ReceivedInfo> = received.iter().map(to_received_info).collect();
109
110 let response = ReceivedListResponse {
111 total: messages.len(),
112 messages,
113 };
114 print_success(format, &response);
115 Ok(())
116 }
117 ReceivedCommands::View { id, agent_did } => {
118 let effective_did = agent_did.as_deref().unwrap_or(default_agent_did);
119 let storage = tap_integration.storage_for_agent(effective_did).await?;
120 let received = storage.get_received_by_id(*id).await?;
121
122 match received {
123 Some(r) => {
124 let raw_message = serde_json::from_str(&r.raw_message)
125 .unwrap_or(serde_json::Value::String(r.raw_message.clone()));
126
127 let response = ReceivedViewResponse {
128 id: r.id,
129 message_id: r.message_id.clone(),
130 raw_message,
131 source_type: format!("{:?}", r.source_type),
132 status: format!("{:?}", r.status),
133 received_at: r.received_at.clone(),
134 processed_at: r.processed_at.clone(),
135 };
136 print_success(format, &response);
137 Ok(())
138 }
139 None => Err(Error::command_failed(format!(
140 "Received message '{}' not found",
141 id
142 ))),
143 }
144 }
145 }
146}