Skip to main content

mxr_tui/
client.rs

1use futures::{SinkExt, StreamExt};
2use mxr_core::id::*;
3use mxr_core::types::*;
4use mxr_core::MxrError;
5use mxr_protocol::*;
6use std::path::Path;
7use std::sync::atomic::{AtomicU64, Ordering};
8use tokio::net::UnixStream;
9use tokio::sync::mpsc;
10use tokio_util::codec::Framed;
11
12pub struct Client {
13    framed: Framed<UnixStream, IpcCodec>,
14    next_id: AtomicU64,
15    event_tx: Option<mpsc::UnboundedSender<DaemonEvent>>,
16}
17
18impl Client {
19    pub async fn connect(socket_path: &Path) -> std::io::Result<Self> {
20        let stream = UnixStream::connect(socket_path).await?;
21        Ok(Self {
22            framed: Framed::new(stream, IpcCodec::new()),
23            next_id: AtomicU64::new(1),
24            event_tx: None,
25        })
26    }
27
28    pub fn with_event_channel(mut self, tx: mpsc::UnboundedSender<DaemonEvent>) -> Self {
29        self.event_tx = Some(tx);
30        self
31    }
32
33    pub async fn raw_request(&mut self, req: Request) -> Result<Response, MxrError> {
34        self.request(req).await
35    }
36
37    async fn request(&mut self, req: Request) -> Result<Response, MxrError> {
38        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
39        let msg = IpcMessage {
40            id,
41            payload: IpcPayload::Request(req),
42        };
43        self.framed
44            .send(msg)
45            .await
46            .map_err(|e| MxrError::Ipc(e.to_string()))?;
47
48        loop {
49            match self.framed.next().await {
50                Some(Ok(resp_msg)) => match resp_msg.payload {
51                    IpcPayload::Response(resp) if resp_msg.id == id => return Ok(resp),
52                    IpcPayload::Event(event) => {
53                        if let Some(ref tx) = self.event_tx {
54                            let _ = tx.send(event);
55                        }
56                        continue;
57                    }
58                    _ => continue,
59                },
60                Some(Err(e)) => return Err(MxrError::Ipc(describe_ipc_failure(&e.to_string()))),
61                None => {
62                    return Err(MxrError::Ipc(
63                        "Connection closed. The running daemon may be using an incompatible protocol. Restart the daemon after upgrading.".into(),
64                    ))
65                }
66            }
67        }
68    }
69
70    pub async fn list_envelopes(
71        &mut self,
72        limit: u32,
73        offset: u32,
74    ) -> Result<Vec<Envelope>, MxrError> {
75        let resp = self
76            .request(Request::ListEnvelopes {
77                label_id: None,
78                account_id: None,
79                limit,
80                offset,
81            })
82            .await?;
83
84        match resp {
85            Response::Ok {
86                data: ResponseData::Envelopes { envelopes },
87            } => Ok(envelopes),
88            Response::Error { message } => Err(MxrError::Ipc(message)),
89            _ => Err(MxrError::Ipc("Unexpected response".into())),
90        }
91    }
92
93    pub async fn list_labels(&mut self) -> Result<Vec<Label>, MxrError> {
94        let resp = self
95            .request(Request::ListLabels { account_id: None })
96            .await?;
97        match resp {
98            Response::Ok {
99                data: ResponseData::Labels { labels },
100            } => Ok(labels),
101            Response::Error { message } => Err(MxrError::Ipc(message)),
102            _ => Err(MxrError::Ipc("Unexpected response".into())),
103        }
104    }
105
106    pub async fn search(
107        &mut self,
108        query: &str,
109        limit: u32,
110    ) -> Result<Vec<SearchResultItem>, MxrError> {
111        let resp = self
112            .request(Request::Search {
113                query: query.to_string(),
114                limit,
115                mode: None,
116                explain: false,
117            })
118            .await?;
119        match resp {
120            Response::Ok {
121                data: ResponseData::SearchResults { results, .. },
122            } => Ok(results),
123            Response::Error { message } => Err(MxrError::Ipc(message)),
124            _ => Err(MxrError::Ipc("Unexpected response".into())),
125        }
126    }
127
128    pub async fn get_envelope(&mut self, message_id: &MessageId) -> Result<Envelope, MxrError> {
129        let resp = self
130            .request(Request::GetEnvelope {
131                message_id: message_id.clone(),
132            })
133            .await?;
134        match resp {
135            Response::Ok {
136                data: ResponseData::Envelope { envelope },
137            } => Ok(envelope),
138            Response::Error { message } => Err(MxrError::Ipc(message)),
139            _ => Err(MxrError::Ipc("Unexpected response".into())),
140        }
141    }
142
143    pub async fn get_body(&mut self, message_id: &MessageId) -> Result<MessageBody, MxrError> {
144        let resp = self
145            .request(Request::GetBody {
146                message_id: message_id.clone(),
147            })
148            .await?;
149        match resp {
150            Response::Ok {
151                data: ResponseData::Body { body },
152            } => Ok(body),
153            Response::Error { message } => Err(MxrError::Ipc(message)),
154            _ => Err(MxrError::Ipc("Unexpected response".into())),
155        }
156    }
157
158    pub async fn get_thread(
159        &mut self,
160        thread_id: &ThreadId,
161    ) -> Result<(Thread, Vec<Envelope>), MxrError> {
162        let resp = self
163            .request(Request::GetThread {
164                thread_id: thread_id.clone(),
165            })
166            .await?;
167        match resp {
168            Response::Ok {
169                data: ResponseData::Thread { thread, messages },
170            } => Ok((thread, messages)),
171            Response::Error { message } => Err(MxrError::Ipc(message)),
172            _ => Err(MxrError::Ipc("Unexpected response".into())),
173        }
174    }
175
176    pub async fn list_saved_searches(
177        &mut self,
178    ) -> Result<Vec<mxr_core::types::SavedSearch>, MxrError> {
179        let resp = self.request(Request::ListSavedSearches).await?;
180        match resp {
181            Response::Ok {
182                data: ResponseData::SavedSearches { searches },
183            } => Ok(searches),
184            Response::Error { message } => Err(MxrError::Ipc(message)),
185            _ => Err(MxrError::Ipc("Unexpected response".into())),
186        }
187    }
188
189    pub async fn list_subscriptions(
190        &mut self,
191        limit: u32,
192    ) -> Result<Vec<mxr_core::types::SubscriptionSummary>, MxrError> {
193        let resp = self.request(Request::ListSubscriptions { limit }).await?;
194        match resp {
195            Response::Ok {
196                data: ResponseData::Subscriptions { subscriptions },
197            } => Ok(subscriptions),
198            Response::Error { message } => Err(MxrError::Ipc(message)),
199            _ => Err(MxrError::Ipc("Unexpected response".into())),
200        }
201    }
202
203    pub async fn ping(&mut self) -> Result<(), MxrError> {
204        let resp = self.request(Request::Ping).await?;
205        match resp {
206            Response::Ok {
207                data: ResponseData::Pong,
208            } => Ok(()),
209            _ => Err(MxrError::Ipc("Unexpected response".into())),
210        }
211    }
212}
213
214fn describe_ipc_failure(message: &str) -> String {
215    if message.contains("unknown variant") || message.contains("missing field") {
216        format!("IPC protocol mismatch: {message}. Restart the daemon after upgrading.")
217    } else {
218        message.to_string()
219    }
220}