Skip to main content

tap_cli/commands/
received.rs

1use crate::error::{Error, Result};
2use crate::output::{print_success, OutputFormat};
3use crate::tap_integration::TapIntegration;
4use clap::Subcommand;
5use serde::Serialize;
6
7#[derive(Subcommand, Debug)]
8pub enum ReceivedCommands {
9    /// List received messages
10    List {
11        /// Agent DID for storage lookup
12        #[arg(long)]
13        agent_did: Option<String>,
14        /// Maximum results
15        #[arg(long, default_value = "50")]
16        limit: u32,
17        /// Offset for pagination
18        #[arg(long, default_value = "0")]
19        offset: u32,
20    },
21    /// List pending (unprocessed) received messages
22    Pending {
23        /// Agent DID for storage lookup
24        #[arg(long)]
25        agent_did: Option<String>,
26        /// Maximum results
27        #[arg(long, default_value = "50")]
28        limit: u32,
29    },
30    /// View a raw received message by ID
31    View {
32        /// Received message ID (numeric)
33        id: i64,
34        /// Agent DID for storage lookup
35        #[arg(long)]
36        agent_did: Option<String>,
37    },
38}
39
40#[derive(Debug, Serialize)]
41struct ReceivedInfo {
42    id: i64,
43    message_id: Option<String>,
44    source_type: String,
45    status: String,
46    received_at: String,
47    processed_at: Option<String>,
48}
49
50#[derive(Debug, Serialize)]
51struct ReceivedListResponse {
52    messages: Vec<ReceivedInfo>,
53    total: usize,
54}
55
56#[derive(Debug, Serialize)]
57struct ReceivedViewResponse {
58    id: i64,
59    message_id: Option<String>,
60    raw_message: serde_json::Value,
61    source_type: String,
62    status: String,
63    received_at: String,
64    processed_at: Option<String>,
65}
66
67fn to_received_info(r: &tap_node::storage::models::Received) -> ReceivedInfo {
68    ReceivedInfo {
69        id: r.id,
70        message_id: r.message_id.clone(),
71        source_type: format!("{:?}", r.source_type),
72        status: format!("{:?}", r.status),
73        received_at: r.received_at.clone(),
74        processed_at: r.processed_at.clone(),
75    }
76}
77
78pub async fn handle(
79    cmd: &ReceivedCommands,
80    format: OutputFormat,
81    default_agent_did: &str,
82    tap_integration: &TapIntegration,
83) -> Result<()> {
84    match cmd {
85        ReceivedCommands::List {
86            agent_did,
87            limit,
88            offset,
89        } => {
90            let effective_did = agent_did.as_deref().unwrap_or(default_agent_did);
91            let storage = tap_integration.storage_for_agent(effective_did).await?;
92            let received = storage.list_received(*limit, *offset, None, None).await?;
93
94            let messages: Vec<ReceivedInfo> = received.iter().map(to_received_info).collect();
95
96            let response = ReceivedListResponse {
97                total: messages.len(),
98                messages,
99            };
100            print_success(format, &response);
101            Ok(())
102        }
103        ReceivedCommands::Pending { agent_did, limit } => {
104            let effective_did = agent_did.as_deref().unwrap_or(default_agent_did);
105            let storage = tap_integration.storage_for_agent(effective_did).await?;
106            let received = storage.get_pending_received(*limit).await?;
107
108            let messages: Vec<ReceivedInfo> = received.iter().map(to_received_info).collect();
109
110            let response = ReceivedListResponse {
111                total: messages.len(),
112                messages,
113            };
114            print_success(format, &response);
115            Ok(())
116        }
117        ReceivedCommands::View { id, agent_did } => {
118            let effective_did = agent_did.as_deref().unwrap_or(default_agent_did);
119            let storage = tap_integration.storage_for_agent(effective_did).await?;
120            let received = storage.get_received_by_id(*id).await?;
121
122            match received {
123                Some(r) => {
124                    let raw_message = serde_json::from_str(&r.raw_message)
125                        .unwrap_or(serde_json::Value::String(r.raw_message.clone()));
126
127                    let response = ReceivedViewResponse {
128                        id: r.id,
129                        message_id: r.message_id.clone(),
130                        raw_message,
131                        source_type: format!("{:?}", r.source_type),
132                        status: format!("{:?}", r.status),
133                        received_at: r.received_at.clone(),
134                        processed_at: r.processed_at.clone(),
135                    };
136                    print_success(format, &response);
137                    Ok(())
138                }
139                None => Err(Error::command_failed(format!(
140                    "Received message '{}' not found",
141                    id
142                ))),
143            }
144        }
145    }
146}