myko_rs/
client.rs

1use crate::{
2    event::MEvent,
3    item::Eventable,
4    message::MykoMessage,
5    query::{wrap_query, QueryId, QueryItemType},
6    report::{wrap_report, MykoReport, ReportId},
7    websocket::{AutoReconnectSocket, SocketConnectionStatus},
8};
9use futures_signals::signal::{Signal, SignalExt};
10use serde::{de::DeserializeOwned, Serialize};
11use serde_json::{json, Value};
12use std::{collections::HashMap, sync::Arc};
13use tokio_stream::{wrappers::BroadcastStream, StreamExt};
14use tokio_tungstenite::tungstenite::protocol::Message;
15
16use url::Url;
17
18#[derive(Clone, Debug, PartialEq)]
19pub enum ConnectionStatus {
20    Connected(String),
21    Disconnected,
22}
23
24#[derive(Clone)]
25pub struct MykoClient {
26    socket: Arc<AutoReconnectSocket>,
27}
28
29impl Default for MykoClient {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl MykoClient {
36    pub fn new() -> MykoClient {
37        let socket = Arc::new(AutoReconnectSocket::new());
38
39        MykoClient { socket }
40    }
41
42    pub fn get_status(&self) -> impl Signal<Item = ConnectionStatus> {
43        self.socket
44            .status
45            .signal_cloned()
46            .map(|x| match x {
47                SocketConnectionStatus::Connecting(_addr, _) => ConnectionStatus::Disconnected,
48                SocketConnectionStatus::Connected(addr, _) => ConnectionStatus::Connected(addr),
49                SocketConnectionStatus::Disconnected => ConnectionStatus::Disconnected,
50            })
51            .dedupe_cloned()
52    }
53
54    pub async fn send_event(&self, event: MEvent) -> Result<(), String> {
55        let myko_msg = MykoMessage::<()>::Event(event);
56
57        let val = json!(myko_msg);
58
59        let str = serde_json::to_string(&val).expect("Could not serialize message");
60
61        let msg = Message::Text(str);
62
63        self.socket
64            .outgoing
65            .send(msg)
66            .map(|_| ())
67            .map_err(|err| err.to_string())
68    }
69
70    pub fn get_messages(&self) -> impl tokio_stream::Stream<Item = Value> {
71        let stream = BroadcastStream::new(self.socket.incoming.clone().subscribe());
72
73        stream.filter_map(|x| match x {
74            Ok(Message::Text(content)) => {
75                let d = serde_json::from_str::<Value>(content.as_str());
76
77                let data = d.expect("did not parse data @ get_messages");
78
79                Some(data)
80            }
81            _ => None,
82        })
83    }
84
85    pub fn set_address(&self, addr: Option<String>) {
86        if addr.is_none() {
87            self.socket.set_addr(None);
88            return;
89        }
90
91        let addr = addr.unwrap();
92
93        let parsed = Url::parse(addr.as_str());
94
95        let mut parsed = match parsed {
96            Ok(c) => c,
97            Err(e) => {
98                println!("Could not parse url: {:?}", e);
99
100                let add_ws = format!("ws://{}", addr);
101
102                match Url::parse(add_ws.as_str()) {
103                    Ok(c) => c,
104                    Err(_e) => {
105                        println!("Setting Url to None");
106                        self.socket.set_addr(None);
107                        return;
108                    }
109                }
110            }
111        };
112
113        if parsed.scheme() != "ws" {
114            let _ = parsed.set_scheme("ws");
115        }
116
117        if parsed.path() != "/myko" {
118            parsed.set_path("/myko");
119        }
120
121        if parsed.port().is_none() {
122            let _ = parsed.set_port(Some(5155));
123        }
124
125        self.socket.set_addr(Some(parsed.to_string()));
126    }
127
128    pub async fn get_connection_status(&self) -> ConnectionStatus {
129        self.get_status()
130            .to_stream()
131            .take(1)
132            .next()
133            .await
134            .unwrap_or(ConnectionStatus::Disconnected)
135    }
136
137    pub fn watch_connection_status(&self) -> impl tokio_stream::Stream<Item = ConnectionStatus> {
138        self.get_status().to_stream()
139    }
140
141    pub fn watch_report<T: MykoReport<U> + Clone + Serialize + ReportId, U: DeserializeOwned>(
142        &self,
143        report: &T,
144    ) -> impl tokio_stream::Stream<Item = U> {
145        let stream = self.get_messages();
146
147        let report_id = report.report_id().clone();
148
149        let tx = uuid::Uuid::new_v4().to_string();
150
151        let wrapped = wrap_report(tx.clone(), report);
152
153        let stream = stream.filter_map(move |x| {
154            let d = serde_json::from_value::<MykoMessage<()>>(x);
155
156            let data = match d {
157                Ok(d) => d,
158                Err(e) => {
159                    println!("Could not parse data @ watch_report: {:?}", e);
160                    return None;
161                }
162            };
163
164            match data {
165                MykoMessage::ReportResponse(response) => {
166                    if response.tx != tx {
167                        return None;
168                    }
169
170                    let data = serde_json::from_value::<U>(response.response.clone())
171                        .expect("could not parse report value @ watch_report ");
172
173                    // println!("Report {} had response: {}", report_id, response.response);
174
175                    Some(data)
176                }
177                _ => None,
178            }
179        });
180
181        if wrapped.is_err() {
182            eprint!("Could not wrap report: {:?}", wrapped.err());
183            return stream;
184        }
185
186        let wrapped = wrapped.unwrap();
187        let msg = MykoMessage::<()>::Report(wrapped);
188
189        let msg = Message::Text(serde_json::to_string(&msg).expect("Could not serialize message"));
190
191        let report_send_socket = self.socket.clone();
192        let report_send_self = self.clone();
193        let report_send_report_id = report_id.clone();
194
195        tokio::spawn(async move {
196            let mut stream = report_send_self.watch_connection_status();
197            while let Some(status) = stream.next().await {
198                match status {
199                    ConnectionStatus::Connected(_) => {
200                        match report_send_socket.outgoing.send(msg.clone()) {
201                            Ok(_) => {
202                                println!("Watching report {}", report_send_report_id);
203                            }
204                            Err(e) => {
205                                println!("Could not send message to ws: {:?}", e);
206                            }
207                        }
208                    }
209                    ConnectionStatus::Disconnected => {
210                        println!("Report {} Disconnected", report_send_report_id);
211                    }
212                }
213            }
214        });
215
216        stream
217    }
218
219    pub fn watch_query<
220        T: Clone
221            + DeserializeOwned
222            + Eventable<T, PT>
223            + PartialEq
224            + DeserializeOwned
225            + std::fmt::Debug,
226        PT: Clone,
227        Q: QueryId + QueryItemType + Serialize + Clone,
228    >(
229        &self,
230        query: Q,
231    ) -> impl tokio_stream::Stream<Item = Vec<T>> {
232        let stream = self.get_messages();
233
234        let tx = uuid::Uuid::new_v4().to_string();
235
236        let query_id = query.query_id();
237        let wrapped = wrap_query(tx.clone(), query);
238
239        let send_query_id = query_id.clone();
240        let state: Arc<std::sync::Mutex<HashMap<String, T>>> = Arc::default();
241
242        let stream = stream.filter_map(move |x| {
243            let d = serde_json::from_value::<MykoMessage<()>>(x);
244
245            let data = match d {
246                Ok(d) => d,
247                Err(e) => {
248                    println!("Could not parse data @ watch_query: {e:?}");
249                    return None;
250                }
251            };
252
253            let vec = match data {
254                MykoMessage::QueryResponse(response) => {
255                    if response.tx != tx {
256                        return None;
257                    }
258
259                    let mut state = state.lock().expect("Cannot lock state");
260                    let upserts = response.upserts;
261                    let deletes = response.deletes;
262                    let seq = response.sequence;
263
264                    if seq == 0 {
265                        println!("Clearing {} state", query_id.clone());
266                        state.clear();
267                    }
268
269                    let upserts: Vec<T> = upserts
270                        .iter()
271                        .filter_map(|x| serde_json::from_value::<T>(x.item.clone()).ok())
272                        .collect();
273
274                    for up in upserts.iter() {
275                        let _len = state.len();
276                        state.insert(up.id().clone(), up.clone());
277                    }
278
279                    for del in deletes.iter() {
280                        let _len = state.len();
281                        state.remove(del);
282                    }
283
284                    Some(state.values().cloned().collect::<Vec<T>>())
285                }
286                _ => None,
287            };
288
289            vec
290        });
291
292        if wrapped.is_err() {
293            eprint!("Could not wrap query: {:?}", wrapped.err());
294            return stream;
295        }
296
297        let wrapped = wrapped.unwrap();
298
299        let msg = MykoMessage::<()>::Query(wrapped);
300
301        let msg = Message::Text(serde_json::to_string(&msg).expect("Could not serialize message"));
302
303        let query_send_socket = self.socket.clone();
304        let query_send_self = self.clone();
305
306        tokio::spawn(async move {
307            let mut stream = query_send_self.watch_connection_status();
308            while let Some(status) = stream.next().await {
309                match status {
310                    ConnectionStatus::Connected(_) => {
311                        match query_send_socket.outgoing.send(msg.clone()) {
312                            Ok(_) => {
313                                println!("Watching query {send_query_id}");
314                            }
315                            Err(e) => {
316                                println!("Could not send message to ws: {e:?}");
317                            }
318                        }
319                    }
320                    ConnectionStatus::Disconnected => {
321                        println!("Query {send_query_id} Disconnected");
322                    }
323                }
324            }
325        });
326
327        stream
328    }
329}