1use futures::{SinkExt, StreamExt};
2use mxr_core::id::*;
3use mxr_core::types::*;
4use mxr_core::MxrError;
5use mxr_protocol::*;
6use std::path::Path;
7use std::sync::atomic::{AtomicU64, Ordering};
8use tokio::net::UnixStream;
9use tokio::sync::mpsc;
10use tokio_util::codec::Framed;
11
12pub struct Client {
13 framed: Framed<UnixStream, IpcCodec>,
14 next_id: AtomicU64,
15 event_tx: Option<mpsc::UnboundedSender<DaemonEvent>>,
16}
17
18impl Client {
19 pub async fn connect(socket_path: &Path) -> std::io::Result<Self> {
20 let stream = UnixStream::connect(socket_path).await?;
21 Ok(Self {
22 framed: Framed::new(stream, IpcCodec::new()),
23 next_id: AtomicU64::new(1),
24 event_tx: None,
25 })
26 }
27
28 pub fn with_event_channel(mut self, tx: mpsc::UnboundedSender<DaemonEvent>) -> Self {
29 self.event_tx = Some(tx);
30 self
31 }
32
33 pub async fn raw_request(&mut self, req: Request) -> Result<Response, MxrError> {
34 self.request(req).await
35 }
36
37 async fn request(&mut self, req: Request) -> Result<Response, MxrError> {
38 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
39 let msg = IpcMessage {
40 id,
41 payload: IpcPayload::Request(req),
42 };
43 self.framed
44 .send(msg)
45 .await
46 .map_err(|e| MxrError::Ipc(e.to_string()))?;
47
48 loop {
49 match self.framed.next().await {
50 Some(Ok(resp_msg)) => match resp_msg.payload {
51 IpcPayload::Response(resp) if resp_msg.id == id => return Ok(resp),
52 IpcPayload::Event(event) => {
53 if let Some(ref tx) = self.event_tx {
54 let _ = tx.send(event);
55 }
56 continue;
57 }
58 _ => continue,
59 },
60 Some(Err(e)) => return Err(MxrError::Ipc(e.to_string())),
61 None => return Err(MxrError::Ipc("Connection closed".into())),
62 }
63 }
64 }
65
66 pub async fn list_envelopes(
67 &mut self,
68 limit: u32,
69 offset: u32,
70 ) -> Result<Vec<Envelope>, MxrError> {
71 let resp = self
72 .request(Request::ListEnvelopes {
73 label_id: None,
74 account_id: None,
75 limit,
76 offset,
77 })
78 .await?;
79
80 match resp {
81 Response::Ok {
82 data: ResponseData::Envelopes { envelopes },
83 } => Ok(envelopes),
84 Response::Error { message } => Err(MxrError::Ipc(message)),
85 _ => Err(MxrError::Ipc("Unexpected response".into())),
86 }
87 }
88
89 pub async fn list_labels(&mut self) -> Result<Vec<Label>, MxrError> {
90 let resp = self
91 .request(Request::ListLabels { account_id: None })
92 .await?;
93 match resp {
94 Response::Ok {
95 data: ResponseData::Labels { labels },
96 } => Ok(labels),
97 Response::Error { message } => Err(MxrError::Ipc(message)),
98 _ => Err(MxrError::Ipc("Unexpected response".into())),
99 }
100 }
101
102 pub async fn search(
103 &mut self,
104 query: &str,
105 limit: u32,
106 ) -> Result<Vec<SearchResultItem>, MxrError> {
107 let resp = self
108 .request(Request::Search {
109 query: query.to_string(),
110 limit,
111 mode: None,
112 explain: false,
113 })
114 .await?;
115 match resp {
116 Response::Ok {
117 data: ResponseData::SearchResults { results },
118 } => Ok(results),
119 Response::Error { message } => Err(MxrError::Ipc(message)),
120 _ => Err(MxrError::Ipc("Unexpected response".into())),
121 }
122 }
123
124 pub async fn get_envelope(&mut self, message_id: &MessageId) -> Result<Envelope, MxrError> {
125 let resp = self
126 .request(Request::GetEnvelope {
127 message_id: message_id.clone(),
128 })
129 .await?;
130 match resp {
131 Response::Ok {
132 data: ResponseData::Envelope { envelope },
133 } => Ok(envelope),
134 Response::Error { message } => Err(MxrError::Ipc(message)),
135 _ => Err(MxrError::Ipc("Unexpected response".into())),
136 }
137 }
138
139 pub async fn get_body(&mut self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
140 let resp = self
141 .request(Request::GetBody {
142 message_id: message_id.clone(),
143 })
144 .await?;
145 match resp {
146 Response::Ok {
147 data: ResponseData::Body { body },
148 } => Ok(body),
149 Response::Error { message } => Err(MxrError::Ipc(message)),
150 _ => Err(MxrError::Ipc("Unexpected response".into())),
151 }
152 }
153
154 pub async fn get_thread(
155 &mut self,
156 thread_id: &ThreadId,
157 ) -> Result<(Thread, Vec<Envelope>), MxrError> {
158 let resp = self
159 .request(Request::GetThread {
160 thread_id: thread_id.clone(),
161 })
162 .await?;
163 match resp {
164 Response::Ok {
165 data: ResponseData::Thread { thread, messages },
166 } => Ok((thread, messages)),
167 Response::Error { message } => Err(MxrError::Ipc(message)),
168 _ => Err(MxrError::Ipc("Unexpected response".into())),
169 }
170 }
171
172 pub async fn list_saved_searches(
173 &mut self,
174 ) -> Result<Vec<mxr_core::types::SavedSearch>, MxrError> {
175 let resp = self.request(Request::ListSavedSearches).await?;
176 match resp {
177 Response::Ok {
178 data: ResponseData::SavedSearches { searches },
179 } => Ok(searches),
180 Response::Error { message } => Err(MxrError::Ipc(message)),
181 _ => Err(MxrError::Ipc("Unexpected response".into())),
182 }
183 }
184
185 pub async fn list_subscriptions(
186 &mut self,
187 limit: u32,
188 ) -> Result<Vec<mxr_core::types::SubscriptionSummary>, MxrError> {
189 let resp = self.request(Request::ListSubscriptions { limit }).await?;
190 match resp {
191 Response::Ok {
192 data: ResponseData::Subscriptions { subscriptions },
193 } => Ok(subscriptions),
194 Response::Error { message } => Err(MxrError::Ipc(message)),
195 _ => Err(MxrError::Ipc("Unexpected response".into())),
196 }
197 }
198
199 pub async fn ping(&mut self) -> Result<(), MxrError> {
200 let resp = self.request(Request::Ping).await?;
201 match resp {
202 Response::Ok {
203 data: ResponseData::Pong,
204 } => Ok(()),
205 _ => Err(MxrError::Ipc("Unexpected response".into())),
206 }
207 }
208}