1use futures::{SinkExt, StreamExt};
2use mxr_core::id::*;
3use mxr_core::types::*;
4use mxr_core::MxrError;
5use mxr_protocol::*;
6use std::path::Path;
7use std::sync::atomic::{AtomicU64, Ordering};
8use tokio::net::UnixStream;
9use tokio::sync::mpsc;
10use tokio_util::codec::Framed;
11
12pub struct Client {
13 framed: Framed<UnixStream, IpcCodec>,
14 next_id: AtomicU64,
15 event_tx: Option<mpsc::UnboundedSender<DaemonEvent>>,
16}
17
18impl Client {
19 pub async fn connect(socket_path: &Path) -> std::io::Result<Self> {
20 let stream = UnixStream::connect(socket_path).await?;
21 Ok(Self {
22 framed: Framed::new(stream, IpcCodec::new()),
23 next_id: AtomicU64::new(1),
24 event_tx: None,
25 })
26 }
27
28 pub fn with_event_channel(mut self, tx: mpsc::UnboundedSender<DaemonEvent>) -> Self {
29 self.event_tx = Some(tx);
30 self
31 }
32
33 pub async fn raw_request(&mut self, req: Request) -> Result<Response, MxrError> {
34 self.request(req).await
35 }
36
37 async fn request(&mut self, req: Request) -> Result<Response, MxrError> {
38 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
39 let msg = IpcMessage {
40 id,
41 payload: IpcPayload::Request(req),
42 };
43 self.framed
44 .send(msg)
45 .await
46 .map_err(|e| MxrError::Ipc(e.to_string()))?;
47
48 loop {
49 match self.framed.next().await {
50 Some(Ok(resp_msg)) => match resp_msg.payload {
51 IpcPayload::Response(resp) if resp_msg.id == id => return Ok(resp),
52 IpcPayload::Event(event) => {
53 if let Some(ref tx) = self.event_tx {
54 let _ = tx.send(event);
55 }
56 continue;
57 }
58 _ => continue,
59 },
60 Some(Err(e)) => return Err(MxrError::Ipc(e.to_string())),
61 None => return Err(MxrError::Ipc("Connection closed".into())),
62 }
63 }
64 }
65
66 pub async fn list_envelopes(
67 &mut self,
68 limit: u32,
69 offset: u32,
70 ) -> Result<Vec<Envelope>, MxrError> {
71 let resp = self
72 .request(Request::ListEnvelopes {
73 label_id: None,
74 account_id: None,
75 limit,
76 offset,
77 })
78 .await?;
79
80 match resp {
81 Response::Ok {
82 data: ResponseData::Envelopes { envelopes },
83 } => Ok(envelopes),
84 Response::Error { message } => Err(MxrError::Ipc(message)),
85 _ => Err(MxrError::Ipc("Unexpected response".into())),
86 }
87 }
88
89 pub async fn list_labels(&mut self) -> Result<Vec<Label>, MxrError> {
90 let resp = self
91 .request(Request::ListLabels { account_id: None })
92 .await?;
93 match resp {
94 Response::Ok {
95 data: ResponseData::Labels { labels },
96 } => Ok(labels),
97 Response::Error { message } => Err(MxrError::Ipc(message)),
98 _ => Err(MxrError::Ipc("Unexpected response".into())),
99 }
100 }
101
102 pub async fn search(
103 &mut self,
104 query: &str,
105 limit: u32,
106 ) -> Result<Vec<SearchResultItem>, MxrError> {
107 let resp = self
108 .request(Request::Search {
109 query: query.to_string(),
110 limit,
111 })
112 .await?;
113 match resp {
114 Response::Ok {
115 data: ResponseData::SearchResults { results },
116 } => Ok(results),
117 Response::Error { message } => Err(MxrError::Ipc(message)),
118 _ => Err(MxrError::Ipc("Unexpected response".into())),
119 }
120 }
121
122 pub async fn get_envelope(&mut self, message_id: &MessageId) -> Result<Envelope, MxrError> {
123 let resp = self
124 .request(Request::GetEnvelope {
125 message_id: message_id.clone(),
126 })
127 .await?;
128 match resp {
129 Response::Ok {
130 data: ResponseData::Envelope { envelope },
131 } => Ok(envelope),
132 Response::Error { message } => Err(MxrError::Ipc(message)),
133 _ => Err(MxrError::Ipc("Unexpected response".into())),
134 }
135 }
136
137 pub async fn get_body(&mut self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
138 let resp = self
139 .request(Request::GetBody {
140 message_id: message_id.clone(),
141 })
142 .await?;
143 match resp {
144 Response::Ok {
145 data: ResponseData::Body { body },
146 } => Ok(body),
147 Response::Error { message } => Err(MxrError::Ipc(message)),
148 _ => Err(MxrError::Ipc("Unexpected response".into())),
149 }
150 }
151
152 pub async fn get_thread(
153 &mut self,
154 thread_id: &ThreadId,
155 ) -> Result<(Thread, Vec<Envelope>), MxrError> {
156 let resp = self
157 .request(Request::GetThread {
158 thread_id: thread_id.clone(),
159 })
160 .await?;
161 match resp {
162 Response::Ok {
163 data: ResponseData::Thread { thread, messages },
164 } => Ok((thread, messages)),
165 Response::Error { message } => Err(MxrError::Ipc(message)),
166 _ => Err(MxrError::Ipc("Unexpected response".into())),
167 }
168 }
169
170 pub async fn list_saved_searches(
171 &mut self,
172 ) -> Result<Vec<mxr_core::types::SavedSearch>, MxrError> {
173 let resp = self.request(Request::ListSavedSearches).await?;
174 match resp {
175 Response::Ok {
176 data: ResponseData::SavedSearches { searches },
177 } => Ok(searches),
178 Response::Error { message } => Err(MxrError::Ipc(message)),
179 _ => Err(MxrError::Ipc("Unexpected response".into())),
180 }
181 }
182
183 pub async fn list_subscriptions(
184 &mut self,
185 limit: u32,
186 ) -> Result<Vec<mxr_core::types::SubscriptionSummary>, MxrError> {
187 let resp = self.request(Request::ListSubscriptions { limit }).await?;
188 match resp {
189 Response::Ok {
190 data: ResponseData::Subscriptions { subscriptions },
191 } => Ok(subscriptions),
192 Response::Error { message } => Err(MxrError::Ipc(message)),
193 _ => Err(MxrError::Ipc("Unexpected response".into())),
194 }
195 }
196
197 pub async fn ping(&mut self) -> Result<(), MxrError> {
198 let resp = self.request(Request::Ping).await?;
199 match resp {
200 Response::Ok {
201 data: ResponseData::Pong,
202 } => Ok(()),
203 _ => Err(MxrError::Ipc("Unexpected response".into())),
204 }
205 }
206}