1use crate::types::{
4 AgentId, ConversationId, HostedSendRequest, IdentityResponse, InboundEnvelope,
5 SendEnvelopeRequest, SendEnvelopeResponse,
6};
7use anyhow::{bail, Context, Result};
8use base64::engine::general_purpose::STANDARD as BASE64;
9use base64::Engine as _;
10use futures_util::StreamExt;
11use http::Request;
12use std::time::Duration;
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14use tracing::debug;
15
16#[derive(Clone, Debug)]
28pub struct NodeClient {
29 pub api_base: String,
30 pub token: Option<String>,
31 http: reqwest::Client,
32}
33
34impl NodeClient {
35 pub fn new(api_base: impl Into<String>, token: Option<String>) -> Result<Self> {
37 let http = reqwest::Client::builder()
38 .timeout(Duration::from_secs(15))
39 .build()
40 .context("failed to build HTTP client")?;
41 Ok(Self { api_base: api_base.into(), token, http })
42 }
43
44 pub fn ws_base(&self) -> String {
48 self.api_base
49 .replacen("https://", "wss://", 1)
50 .replacen("http://", "ws://", 1)
51 }
52
53 pub fn inbox_ws_url(&self) -> String {
55 format!("{}/ws/inbox", self.ws_base())
56 }
57
58 pub fn hosted_inbox_ws_url(&self, hosted_token: &str) -> String {
60 format!("{}/ws/hosted/inbox?token={hosted_token}", self.ws_base())
61 }
62
63 fn auth_header(&self) -> Option<String> {
66 self.token.as_deref().filter(|t| !t.is_empty()).map(|t| format!("Bearer {t}"))
67 }
68
69 fn add_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
70 if let Some(auth) = self.auth_header() {
71 builder.header("Authorization", auth)
72 } else {
73 builder
74 }
75 }
76
77 pub async fn ping(&self) -> bool {
81 let url = format!("{}/hosted/ping", self.api_base);
82 self.http
83 .get(&url)
84 .timeout(Duration::from_secs(5))
85 .send()
86 .await
87 .map(|r| r.status().is_success())
88 .unwrap_or(false)
89 }
90
91 pub async fn identity(&self) -> Result<AgentId> {
93 let url = format!("{}/identity", self.api_base);
94 let resp: IdentityResponse = self
95 .add_auth(self.http.get(&url))
96 .send()
97 .await
98 .context("GET /identity")?
99 .json()
100 .await
101 .context("deserialize /identity")?;
102 debug!("identity: {}", resp.agent_id);
103 AgentId::from_hex(&resp.agent_id).context("invalid agent_id in /identity response")
104 }
105
106 pub async fn send_envelope(
110 &self,
111 msg_type: &str,
112 recipient: Option<&AgentId>,
113 conversation_id: &ConversationId,
114 payload: &[u8],
115 ) -> Result<SendEnvelopeResponse> {
116 let url = format!("{}/envelopes/send", self.api_base);
117 let body = SendEnvelopeRequest {
118 msg_type: msg_type.to_uppercase(),
119 recipient: recipient.map(|id| id.to_hex()),
120 conversation_id: conversation_id.to_hex(),
121 payload_b64: BASE64.encode(payload),
122 };
123 let resp = self
124 .add_auth(self.http.post(&url).json(&body))
125 .send()
126 .await
127 .context("POST /envelopes/send")?;
128
129 if !resp.status().is_success() {
130 let status = resp.status();
131 let text = resp.text().await.unwrap_or_default();
132 bail!("/envelopes/send {status}: {text}");
133 }
134 resp.json::<SendEnvelopeResponse>().await.context("deserialize /envelopes/send response")
135 }
136
137 pub async fn hosted_send(
141 &self,
142 hosted_token: &str,
143 msg_type: &str,
144 recipient: Option<&AgentId>,
145 conversation_id: &ConversationId,
146 payload: &[u8],
147 ) -> Result<()> {
148 let url = format!("{}/hosted/send", self.api_base);
149 let body = HostedSendRequest {
150 msg_type: msg_type.to_uppercase(),
151 recipient: recipient.map(|id| id.to_hex()),
152 conversation_id: conversation_id.to_hex(),
153 payload_hex: hex::encode(payload),
154 };
155 let resp = self
156 .http
157 .post(&url)
158 .header("Authorization", format!("Bearer {hosted_token}"))
159 .json(&body)
160 .send()
161 .await
162 .context("POST /hosted/send")?;
163
164 if !resp.status().is_success() {
165 let status = resp.status();
166 let text = resp.text().await.unwrap_or_default();
167 bail!("/hosted/send {status}: {text}");
168 }
169 Ok(())
170 }
171
172 pub async fn listen_inbox<F, Fut>(&self, mut handler: F) -> Result<()>
190 where
191 F: FnMut(InboundEnvelope) -> Fut,
192 Fut: std::future::Future<Output = Result<()>>,
193 {
194 let url = self.inbox_ws_url();
195 let key = base64::engine::general_purpose::STANDARD.encode(uuid::Uuid::new_v4().as_bytes());
196 let mut req = Request::builder()
197 .uri(&url)
198 .header("Upgrade", "websocket")
199 .header("Connection", "Upgrade")
200 .header("Sec-WebSocket-Key", key)
201 .header("Sec-WebSocket-Version", "13");
202 if let Some(auth) = self.auth_header() {
203 req = req.header("Authorization", auth);
204 }
205 let (ws, _) = connect_async(req.body(())?).await.context("connect /ws/inbox")?;
206 let (_, mut rx) = ws.split();
207
208 while let Some(msg) = rx.next().await {
209 match msg? {
210 Message::Text(t) => {
211 if let Ok(env) = serde_json::from_str::<InboundEnvelope>(t.as_ref()) {
212 handler(env).await?;
213 }
214 }
215 Message::Close(_) => break,
216 _ => {}
217 }
218 }
219 Ok(())
220 }
221}