Skip to main content

zerox1_client/
client.rs

1//! HTTP client for the zerox1-node local REST API.
2
3use crate::types::{
4    AgentId, ConversationId, HostedSendRequest, IdentityResponse, InboundEnvelope,
5    SendEnvelopeRequest, SendEnvelopeResponse,
6};
7use anyhow::{bail, Context, Result};
8use base64::engine::general_purpose::STANDARD as BASE64;
9use base64::Engine as _;
10use futures_util::StreamExt;
11use http::Request;
12use std::time::Duration;
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14use tracing::debug;
15
16/// HTTP client for the local zerox1-node REST API.
17///
18/// Supports both **local-node** mode (`POST /envelopes/send`, `GET /ws/inbox`)
19/// and **hosted-agent** mode (`POST /hosted/send`, `GET /ws/hosted/inbox`).
20///
21/// # Example
22/// ```no_run
23/// use zerox1_client::NodeClient;
24///
25/// let client = NodeClient::new("http://127.0.0.1:9090", Some("my-secret".into()));
26/// ```
27#[derive(Clone, Debug)]
28pub struct NodeClient {
29    pub api_base: String,
30    pub token: Option<String>,
31    http: reqwest::Client,
32}
33
34impl NodeClient {
35    /// Create a new client. `token` is the `ZX01_API_SECRET` or a read key.
36    pub fn new(api_base: impl Into<String>, token: Option<String>) -> Result<Self> {
37        let http = reqwest::Client::builder()
38            .timeout(Duration::from_secs(15))
39            .build()
40            .context("failed to build HTTP client")?;
41        Ok(Self { api_base: api_base.into(), token, http })
42    }
43
44    // ── URL helpers ─────────────────────────────────────────────────────
45
46    /// Derive the WebSocket base URL (`http` → `ws`, `https` → `wss`).
47    pub fn ws_base(&self) -> String {
48        self.api_base
49            .replacen("https://", "wss://", 1)
50            .replacen("http://", "ws://", 1)
51    }
52
53    /// WebSocket URL for `GET /ws/inbox`.
54    pub fn inbox_ws_url(&self) -> String {
55        format!("{}/ws/inbox", self.ws_base())
56    }
57
58    /// WebSocket URL for `GET /ws/hosted/inbox` (hosted-agent mode).
59    pub fn hosted_inbox_ws_url(&self, hosted_token: &str) -> String {
60        format!("{}/ws/hosted/inbox?token={hosted_token}", self.ws_base())
61    }
62
63    // ── Auth helper ─────────────────────────────────────────────────────
64
65    fn auth_header(&self) -> Option<String> {
66        self.token.as_deref().filter(|t| !t.is_empty()).map(|t| format!("Bearer {t}"))
67    }
68
69    fn add_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
70        if let Some(auth) = self.auth_header() {
71            builder.header("Authorization", auth)
72        } else {
73            builder
74        }
75    }
76
77    // ── Node endpoints ──────────────────────────────────────────────────
78
79    /// `GET /hosted/ping` — lightweight reachability probe.
80    pub async fn ping(&self) -> bool {
81        let url = format!("{}/hosted/ping", self.api_base);
82        self.http
83            .get(&url)
84            .timeout(Duration::from_secs(5))
85            .send()
86            .await
87            .map(|r| r.status().is_success())
88            .unwrap_or(false)
89    }
90
91    /// `GET /identity` — fetch the node's own agent_id.
92    pub async fn identity(&self) -> Result<AgentId> {
93        let url = format!("{}/identity", self.api_base);
94        let resp: IdentityResponse = self
95            .add_auth(self.http.get(&url))
96            .send()
97            .await
98            .context("GET /identity")?
99            .json()
100            .await
101            .context("deserialize /identity")?;
102        debug!("identity: {}", resp.agent_id);
103        AgentId::from_hex(&resp.agent_id).context("invalid agent_id in /identity response")
104    }
105
106    /// `POST /envelopes/send` — local-node mode.
107    ///
108    /// `payload` is raw bytes; base64-encoding is handled internally.
109    pub async fn send_envelope(
110        &self,
111        msg_type: &str,
112        recipient: Option<&AgentId>,
113        conversation_id: &ConversationId,
114        payload: &[u8],
115    ) -> Result<SendEnvelopeResponse> {
116        let url = format!("{}/envelopes/send", self.api_base);
117        let body = SendEnvelopeRequest {
118            msg_type: msg_type.to_uppercase(),
119            recipient: recipient.map(|id| id.to_hex()),
120            conversation_id: conversation_id.to_hex(),
121            payload_b64: BASE64.encode(payload),
122        };
123        let resp = self
124            .add_auth(self.http.post(&url).json(&body))
125            .send()
126            .await
127            .context("POST /envelopes/send")?;
128
129        if !resp.status().is_success() {
130            let status = resp.status();
131            let text = resp.text().await.unwrap_or_default();
132            bail!("/envelopes/send {status}: {text}");
133        }
134        resp.json::<SendEnvelopeResponse>().await.context("deserialize /envelopes/send response")
135    }
136
137    /// `POST /hosted/send` — hosted-agent mode.
138    ///
139    /// `payload` is raw bytes; hex-encoding is handled internally.
140    pub async fn hosted_send(
141        &self,
142        hosted_token: &str,
143        msg_type: &str,
144        recipient: Option<&AgentId>,
145        conversation_id: &ConversationId,
146        payload: &[u8],
147    ) -> Result<()> {
148        let url = format!("{}/hosted/send", self.api_base);
149        let body = HostedSendRequest {
150            msg_type: msg_type.to_uppercase(),
151            recipient: recipient.map(|id| id.to_hex()),
152            conversation_id: conversation_id.to_hex(),
153            payload_hex: hex::encode(payload),
154        };
155        let resp = self
156            .http
157            .post(&url)
158            .header("Authorization", format!("Bearer {hosted_token}"))
159            .json(&body)
160            .send()
161            .await
162            .context("POST /hosted/send")?;
163
164        if !resp.status().is_success() {
165            let status = resp.status();
166            let text = resp.text().await.unwrap_or_default();
167            bail!("/hosted/send {status}: {text}");
168        }
169        Ok(())
170    }
171
172    // ── WS inbox ────────────────────────────────────────────────────────
173
174    /// Open a `GET /ws/inbox` stream and call `handler` for each inbound envelope.
175    ///
176    /// Returns when the connection closes or an error occurs. The caller is
177    /// responsible for reconnect logic.
178    ///
179    /// # Example
180    /// ```no_run
181    /// # use zerox1_client::{NodeClient, InboundEnvelope};
182    /// # async fn example(client: NodeClient) {
183    /// client.listen_inbox(|env| async move {
184    ///     println!("{} from {}", env.msg_type, env.sender);
185    ///     Ok(())
186    /// }).await.ok();
187    /// # }
188    /// ```
189    pub async fn listen_inbox<F, Fut>(&self, mut handler: F) -> Result<()>
190    where
191        F: FnMut(InboundEnvelope) -> Fut,
192        Fut: std::future::Future<Output = Result<()>>,
193    {
194        let url = self.inbox_ws_url();
195        let key = base64::engine::general_purpose::STANDARD.encode(uuid::Uuid::new_v4().as_bytes());
196        let mut req = Request::builder()
197            .uri(&url)
198            .header("Upgrade", "websocket")
199            .header("Connection", "Upgrade")
200            .header("Sec-WebSocket-Key", key)
201            .header("Sec-WebSocket-Version", "13");
202        if let Some(auth) = self.auth_header() {
203            req = req.header("Authorization", auth);
204        }
205        let (ws, _) = connect_async(req.body(())?).await.context("connect /ws/inbox")?;
206        let (_, mut rx) = ws.split();
207
208        while let Some(msg) = rx.next().await {
209            match msg? {
210                Message::Text(t) => {
211                    if let Ok(env) = serde_json::from_str::<InboundEnvelope>(t.as_ref()) {
212                        handler(env).await?;
213                    }
214                }
215                Message::Close(_) => break,
216                _ => {}
217            }
218        }
219        Ok(())
220    }
221}