Skip to main content

mxr_tui/
client.rs

1use futures::{SinkExt, StreamExt};
2use mxr_core::id::*;
3use mxr_core::types::*;
4use mxr_core::MxrError;
5use mxr_protocol::*;
6use std::path::Path;
7use std::sync::atomic::{AtomicU64, Ordering};
8use tokio::net::UnixStream;
9use tokio::sync::mpsc;
10use tokio_util::codec::Framed;
11
12pub struct Client {
13    framed: Framed<UnixStream, IpcCodec>,
14    next_id: AtomicU64,
15    event_tx: Option<mpsc::UnboundedSender<DaemonEvent>>,
16}
17
18impl Client {
19    pub async fn connect(socket_path: &Path) -> std::io::Result<Self> {
20        let stream = UnixStream::connect(socket_path).await?;
21        Ok(Self {
22            framed: Framed::new(stream, IpcCodec::new()),
23            next_id: AtomicU64::new(1),
24            event_tx: None,
25        })
26    }
27
28    pub fn with_event_channel(mut self, tx: mpsc::UnboundedSender<DaemonEvent>) -> Self {
29        self.event_tx = Some(tx);
30        self
31    }
32
33    pub async fn raw_request(&mut self, req: Request) -> Result<Response, MxrError> {
34        self.request(req).await
35    }
36
37    async fn request(&mut self, req: Request) -> Result<Response, MxrError> {
38        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
39        let msg = IpcMessage {
40            id,
41            payload: IpcPayload::Request(req),
42        };
43        self.framed
44            .send(msg)
45            .await
46            .map_err(|e| MxrError::Ipc(e.to_string()))?;
47
48        loop {
49            match self.framed.next().await {
50                Some(Ok(resp_msg)) => match resp_msg.payload {
51                    IpcPayload::Response(resp) if resp_msg.id == id => return Ok(resp),
52                    IpcPayload::Event(event) => {
53                        if let Some(ref tx) = self.event_tx {
54                            let _ = tx.send(event);
55                        }
56                        continue;
57                    }
58                    _ => continue,
59                },
60                Some(Err(e)) => return Err(MxrError::Ipc(e.to_string())),
61                None => return Err(MxrError::Ipc("Connection closed".into())),
62            }
63        }
64    }
65
66    pub async fn list_envelopes(
67        &mut self,
68        limit: u32,
69        offset: u32,
70    ) -> Result<Vec<Envelope>, MxrError> {
71        let resp = self
72            .request(Request::ListEnvelopes {
73                label_id: None,
74                account_id: None,
75                limit,
76                offset,
77            })
78            .await?;
79
80        match resp {
81            Response::Ok {
82                data: ResponseData::Envelopes { envelopes },
83            } => Ok(envelopes),
84            Response::Error { message } => Err(MxrError::Ipc(message)),
85            _ => Err(MxrError::Ipc("Unexpected response".into())),
86        }
87    }
88
89    pub async fn list_labels(&mut self) -> Result<Vec<Label>, MxrError> {
90        let resp = self
91            .request(Request::ListLabels { account_id: None })
92            .await?;
93        match resp {
94            Response::Ok {
95                data: ResponseData::Labels { labels },
96            } => Ok(labels),
97            Response::Error { message } => Err(MxrError::Ipc(message)),
98            _ => Err(MxrError::Ipc("Unexpected response".into())),
99        }
100    }
101
102    pub async fn search(
103        &mut self,
104        query: &str,
105        limit: u32,
106    ) -> Result<Vec<SearchResultItem>, MxrError> {
107        let resp = self
108            .request(Request::Search {
109                query: query.to_string(),
110                limit,
111            })
112            .await?;
113        match resp {
114            Response::Ok {
115                data: ResponseData::SearchResults { results },
116            } => Ok(results),
117            Response::Error { message } => Err(MxrError::Ipc(message)),
118            _ => Err(MxrError::Ipc("Unexpected response".into())),
119        }
120    }
121
122    pub async fn get_envelope(&mut self, message_id: &MessageId) -> Result<Envelope, MxrError> {
123        let resp = self
124            .request(Request::GetEnvelope {
125                message_id: message_id.clone(),
126            })
127            .await?;
128        match resp {
129            Response::Ok {
130                data: ResponseData::Envelope { envelope },
131            } => Ok(envelope),
132            Response::Error { message } => Err(MxrError::Ipc(message)),
133            _ => Err(MxrError::Ipc("Unexpected response".into())),
134        }
135    }
136
137    pub async fn get_body(&mut self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
138        let resp = self
139            .request(Request::GetBody {
140                message_id: message_id.clone(),
141            })
142            .await?;
143        match resp {
144            Response::Ok {
145                data: ResponseData::Body { body },
146            } => Ok(body),
147            Response::Error { message } => Err(MxrError::Ipc(message)),
148            _ => Err(MxrError::Ipc("Unexpected response".into())),
149        }
150    }
151
152    pub async fn get_thread(
153        &mut self,
154        thread_id: &ThreadId,
155    ) -> Result<(Thread, Vec<Envelope>), MxrError> {
156        let resp = self
157            .request(Request::GetThread {
158                thread_id: thread_id.clone(),
159            })
160            .await?;
161        match resp {
162            Response::Ok {
163                data: ResponseData::Thread { thread, messages },
164            } => Ok((thread, messages)),
165            Response::Error { message } => Err(MxrError::Ipc(message)),
166            _ => Err(MxrError::Ipc("Unexpected response".into())),
167        }
168    }
169
170    pub async fn list_saved_searches(
171        &mut self,
172    ) -> Result<Vec<mxr_core::types::SavedSearch>, MxrError> {
173        let resp = self.request(Request::ListSavedSearches).await?;
174        match resp {
175            Response::Ok {
176                data: ResponseData::SavedSearches { searches },
177            } => Ok(searches),
178            Response::Error { message } => Err(MxrError::Ipc(message)),
179            _ => Err(MxrError::Ipc("Unexpected response".into())),
180        }
181    }
182
183    pub async fn list_subscriptions(
184        &mut self,
185        limit: u32,
186    ) -> Result<Vec<mxr_core::types::SubscriptionSummary>, MxrError> {
187        let resp = self.request(Request::ListSubscriptions { limit }).await?;
188        match resp {
189            Response::Ok {
190                data: ResponseData::Subscriptions { subscriptions },
191            } => Ok(subscriptions),
192            Response::Error { message } => Err(MxrError::Ipc(message)),
193            _ => Err(MxrError::Ipc("Unexpected response".into())),
194        }
195    }
196
197    pub async fn ping(&mut self) -> Result<(), MxrError> {
198        let resp = self.request(Request::Ping).await?;
199        match resp {
200            Response::Ok {
201                data: ResponseData::Pong,
202            } => Ok(()),
203            _ => Err(MxrError::Ipc("Unexpected response".into())),
204        }
205    }
206}