1use futures::{SinkExt, StreamExt};
2use mxr_core::id::*;
3use mxr_core::types::*;
4use mxr_core::MxrError;
5use mxr_protocol::*;
6use std::path::Path;
7use std::sync::atomic::{AtomicU64, Ordering};
8use tokio::net::UnixStream;
9use tokio::sync::mpsc;
10use tokio_util::codec::Framed;
11
12pub struct Client {
13 framed: Framed<UnixStream, IpcCodec>,
14 next_id: AtomicU64,
15 event_tx: Option<mpsc::UnboundedSender<DaemonEvent>>,
16}
17
18impl Client {
19 pub async fn connect(socket_path: &Path) -> std::io::Result<Self> {
20 let stream = UnixStream::connect(socket_path).await?;
21 Ok(Self {
22 framed: Framed::new(stream, IpcCodec::new()),
23 next_id: AtomicU64::new(1),
24 event_tx: None,
25 })
26 }
27
28 pub fn with_event_channel(mut self, tx: mpsc::UnboundedSender<DaemonEvent>) -> Self {
29 self.event_tx = Some(tx);
30 self
31 }
32
33 pub async fn raw_request(&mut self, req: Request) -> Result<Response, MxrError> {
34 self.request(req).await
35 }
36
37 async fn request(&mut self, req: Request) -> Result<Response, MxrError> {
38 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
39 let msg = IpcMessage {
40 id,
41 payload: IpcPayload::Request(req),
42 };
43 self.framed
44 .send(msg)
45 .await
46 .map_err(|e| MxrError::Ipc(e.to_string()))?;
47
48 loop {
49 match self.framed.next().await {
50 Some(Ok(resp_msg)) => match resp_msg.payload {
51 IpcPayload::Response(resp) if resp_msg.id == id => return Ok(resp),
52 IpcPayload::Event(event) => {
53 if let Some(ref tx) = self.event_tx {
54 let _ = tx.send(event);
55 }
56 continue;
57 }
58 _ => continue,
59 },
60 Some(Err(e)) => return Err(MxrError::Ipc(describe_ipc_failure(&e.to_string()))),
61 None => {
62 return Err(MxrError::Ipc(
63 "Connection closed. The running daemon may be using an incompatible protocol. Restart the daemon after upgrading.".into(),
64 ))
65 }
66 }
67 }
68 }
69
70 pub async fn list_envelopes(
71 &mut self,
72 limit: u32,
73 offset: u32,
74 ) -> Result<Vec<Envelope>, MxrError> {
75 let resp = self
76 .request(Request::ListEnvelopes {
77 label_id: None,
78 account_id: None,
79 limit,
80 offset,
81 })
82 .await?;
83
84 match resp {
85 Response::Ok {
86 data: ResponseData::Envelopes { envelopes },
87 } => Ok(envelopes),
88 Response::Error { message } => Err(MxrError::Ipc(message)),
89 _ => Err(MxrError::Ipc("Unexpected response".into())),
90 }
91 }
92
93 pub async fn list_labels(&mut self) -> Result<Vec<Label>, MxrError> {
94 let resp = self
95 .request(Request::ListLabels { account_id: None })
96 .await?;
97 match resp {
98 Response::Ok {
99 data: ResponseData::Labels { labels },
100 } => Ok(labels),
101 Response::Error { message } => Err(MxrError::Ipc(message)),
102 _ => Err(MxrError::Ipc("Unexpected response".into())),
103 }
104 }
105
106 pub async fn search(
107 &mut self,
108 query: &str,
109 limit: u32,
110 ) -> Result<Vec<SearchResultItem>, MxrError> {
111 let resp = self
112 .request(Request::Search {
113 query: query.to_string(),
114 limit,
115 mode: None,
116 explain: false,
117 })
118 .await?;
119 match resp {
120 Response::Ok {
121 data: ResponseData::SearchResults { results, .. },
122 } => Ok(results),
123 Response::Error { message } => Err(MxrError::Ipc(message)),
124 _ => Err(MxrError::Ipc("Unexpected response".into())),
125 }
126 }
127
128 pub async fn get_envelope(&mut self, message_id: &MessageId) -> Result<Envelope, MxrError> {
129 let resp = self
130 .request(Request::GetEnvelope {
131 message_id: message_id.clone(),
132 })
133 .await?;
134 match resp {
135 Response::Ok {
136 data: ResponseData::Envelope { envelope },
137 } => Ok(envelope),
138 Response::Error { message } => Err(MxrError::Ipc(message)),
139 _ => Err(MxrError::Ipc("Unexpected response".into())),
140 }
141 }
142
143 pub async fn get_body(&mut self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
144 let resp = self
145 .request(Request::GetBody {
146 message_id: message_id.clone(),
147 })
148 .await?;
149 match resp {
150 Response::Ok {
151 data: ResponseData::Body { body },
152 } => Ok(body),
153 Response::Error { message } => Err(MxrError::Ipc(message)),
154 _ => Err(MxrError::Ipc("Unexpected response".into())),
155 }
156 }
157
158 pub async fn get_thread(
159 &mut self,
160 thread_id: &ThreadId,
161 ) -> Result<(Thread, Vec<Envelope>), MxrError> {
162 let resp = self
163 .request(Request::GetThread {
164 thread_id: thread_id.clone(),
165 })
166 .await?;
167 match resp {
168 Response::Ok {
169 data: ResponseData::Thread { thread, messages },
170 } => Ok((thread, messages)),
171 Response::Error { message } => Err(MxrError::Ipc(message)),
172 _ => Err(MxrError::Ipc("Unexpected response".into())),
173 }
174 }
175
176 pub async fn list_saved_searches(
177 &mut self,
178 ) -> Result<Vec<mxr_core::types::SavedSearch>, MxrError> {
179 let resp = self.request(Request::ListSavedSearches).await?;
180 match resp {
181 Response::Ok {
182 data: ResponseData::SavedSearches { searches },
183 } => Ok(searches),
184 Response::Error { message } => Err(MxrError::Ipc(message)),
185 _ => Err(MxrError::Ipc("Unexpected response".into())),
186 }
187 }
188
189 pub async fn list_subscriptions(
190 &mut self,
191 limit: u32,
192 ) -> Result<Vec<mxr_core::types::SubscriptionSummary>, MxrError> {
193 let resp = self.request(Request::ListSubscriptions { limit }).await?;
194 match resp {
195 Response::Ok {
196 data: ResponseData::Subscriptions { subscriptions },
197 } => Ok(subscriptions),
198 Response::Error { message } => Err(MxrError::Ipc(message)),
199 _ => Err(MxrError::Ipc("Unexpected response".into())),
200 }
201 }
202
203 pub async fn ping(&mut self) -> Result<(), MxrError> {
204 let resp = self.request(Request::Ping).await?;
205 match resp {
206 Response::Ok {
207 data: ResponseData::Pong,
208 } => Ok(()),
209 _ => Err(MxrError::Ipc("Unexpected response".into())),
210 }
211 }
212}
213
214fn describe_ipc_failure(message: &str) -> String {
215 if message.contains("unknown variant") || message.contains("missing field") {
216 format!("IPC protocol mismatch: {message}. Restart the daemon after upgrading.")
217 } else {
218 message.to_string()
219 }
220}