1use crate::{
2 event::MEvent,
3 item::Eventable,
4 message::MykoMessage,
5 query::{wrap_query, QueryId, QueryItemType},
6 report::{wrap_report, MykoReport, ReportId},
7 websocket::{AutoReconnectSocket, SocketConnectionStatus},
8};
9use futures_signals::signal::{Signal, SignalExt};
10use serde::{de::DeserializeOwned, Serialize};
11use serde_json::{json, Value};
12use std::{collections::HashMap, sync::Arc};
13use tokio_stream::{wrappers::BroadcastStream, StreamExt};
14use tokio_tungstenite::tungstenite::protocol::Message;
15
16use url::Url;
17
18#[derive(Clone, Debug, PartialEq)]
19pub enum ConnectionStatus {
20 Connected(String),
21 Disconnected,
22}
23
24#[derive(Clone)]
25pub struct MykoClient {
26 socket: Arc<AutoReconnectSocket>,
27}
28
29impl Default for MykoClient {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl MykoClient {
36 pub fn new() -> MykoClient {
37 let socket = Arc::new(AutoReconnectSocket::new());
38
39 MykoClient { socket }
40 }
41
42 pub fn get_status(&self) -> impl Signal<Item = ConnectionStatus> {
43 self.socket
44 .status
45 .signal_cloned()
46 .map(|x| match x {
47 SocketConnectionStatus::Connecting(_addr, _) => ConnectionStatus::Disconnected,
48 SocketConnectionStatus::Connected(addr, _) => ConnectionStatus::Connected(addr),
49 SocketConnectionStatus::Disconnected => ConnectionStatus::Disconnected,
50 })
51 .dedupe_cloned()
52 }
53
54 pub async fn send_event(&self, event: MEvent) -> Result<(), String> {
55 let myko_msg = MykoMessage::<()>::Event(event);
56
57 let val = json!(myko_msg);
58
59 let str = serde_json::to_string(&val).expect("Could not serialize message");
60
61 let msg = Message::Text(str);
62
63 self.socket
64 .outgoing
65 .send(msg)
66 .map(|_| ())
67 .map_err(|err| err.to_string())
68 }
69
70 pub fn get_messages(&self) -> impl tokio_stream::Stream<Item = Value> {
71 let stream = BroadcastStream::new(self.socket.incoming.clone().subscribe());
72
73 stream.filter_map(|x| match x {
74 Ok(Message::Text(content)) => {
75 let d = serde_json::from_str::<Value>(content.as_str());
76
77 let data = d.expect("did not parse data @ get_messages");
78
79 Some(data)
80 }
81 _ => None,
82 })
83 }
84
85 pub fn set_address(&self, addr: Option<String>) {
86 if addr.is_none() {
87 self.socket.set_addr(None);
88 return;
89 }
90
91 let addr = addr.unwrap();
92
93 let parsed = Url::parse(addr.as_str());
94
95 let mut parsed = match parsed {
96 Ok(c) => c,
97 Err(e) => {
98 println!("Could not parse url: {:?}", e);
99
100 let add_ws = format!("ws://{}", addr);
101
102 match Url::parse(add_ws.as_str()) {
103 Ok(c) => c,
104 Err(_e) => {
105 println!("Setting Url to None");
106 self.socket.set_addr(None);
107 return;
108 }
109 }
110 }
111 };
112
113 if parsed.scheme() != "ws" {
114 let _ = parsed.set_scheme("ws");
115 }
116
117 if parsed.path() != "/myko" {
118 parsed.set_path("/myko");
119 }
120
121 if parsed.port().is_none() {
122 let _ = parsed.set_port(Some(5155));
123 }
124
125 self.socket.set_addr(Some(parsed.to_string()));
126 }
127
128 pub async fn get_connection_status(&self) -> ConnectionStatus {
129 self.get_status()
130 .to_stream()
131 .take(1)
132 .next()
133 .await
134 .unwrap_or(ConnectionStatus::Disconnected)
135 }
136
137 pub fn watch_connection_status(&self) -> impl tokio_stream::Stream<Item = ConnectionStatus> {
138 self.get_status().to_stream()
139 }
140
141 pub fn watch_report<T: MykoReport<U> + Clone + Serialize + ReportId, U: DeserializeOwned>(
142 &self,
143 report: &T,
144 ) -> impl tokio_stream::Stream<Item = U> {
145 let stream = self.get_messages();
146
147 let report_id = report.report_id().clone();
148
149 let tx = uuid::Uuid::new_v4().to_string();
150
151 let wrapped = wrap_report(tx.clone(), report);
152
153 let stream = stream.filter_map(move |x| {
154 let d = serde_json::from_value::<MykoMessage<()>>(x);
155
156 let data = match d {
157 Ok(d) => d,
158 Err(e) => {
159 println!("Could not parse data @ watch_report: {:?}", e);
160 return None;
161 }
162 };
163
164 match data {
165 MykoMessage::ReportResponse(response) => {
166 if response.tx != tx {
167 return None;
168 }
169
170 let data = serde_json::from_value::<U>(response.response.clone())
171 .expect("could not parse report value @ watch_report ");
172
173 Some(data)
176 }
177 _ => None,
178 }
179 });
180
181 if wrapped.is_err() {
182 eprint!("Could not wrap report: {:?}", wrapped.err());
183 return stream;
184 }
185
186 let wrapped = wrapped.unwrap();
187 let msg = MykoMessage::<()>::Report(wrapped);
188
189 let msg = Message::Text(serde_json::to_string(&msg).expect("Could not serialize message"));
190
191 let report_send_socket = self.socket.clone();
192 let report_send_self = self.clone();
193 let report_send_report_id = report_id.clone();
194
195 tokio::spawn(async move {
196 let mut stream = report_send_self.watch_connection_status();
197 while let Some(status) = stream.next().await {
198 match status {
199 ConnectionStatus::Connected(_) => {
200 match report_send_socket.outgoing.send(msg.clone()) {
201 Ok(_) => {
202 println!("Watching report {}", report_send_report_id);
203 }
204 Err(e) => {
205 println!("Could not send message to ws: {:?}", e);
206 }
207 }
208 }
209 ConnectionStatus::Disconnected => {
210 println!("Report {} Disconnected", report_send_report_id);
211 }
212 }
213 }
214 });
215
216 stream
217 }
218
219 pub fn watch_query<
220 T: Clone
221 + DeserializeOwned
222 + Eventable<T, PT>
223 + PartialEq
224 + DeserializeOwned
225 + std::fmt::Debug,
226 PT: Clone,
227 Q: QueryId + QueryItemType + Serialize + Clone,
228 >(
229 &self,
230 query: Q,
231 ) -> impl tokio_stream::Stream<Item = Vec<T>> {
232 let stream = self.get_messages();
233
234 let tx = uuid::Uuid::new_v4().to_string();
235
236 let query_id = query.query_id();
237 let wrapped = wrap_query(tx.clone(), query);
238
239 let send_query_id = query_id.clone();
240 let state: Arc<std::sync::Mutex<HashMap<String, T>>> = Arc::default();
241
242 let stream = stream.filter_map(move |x| {
243 let d = serde_json::from_value::<MykoMessage<()>>(x);
244
245 let data = match d {
246 Ok(d) => d,
247 Err(e) => {
248 println!("Could not parse data @ watch_query: {e:?}");
249 return None;
250 }
251 };
252
253 let vec = match data {
254 MykoMessage::QueryResponse(response) => {
255 if response.tx != tx {
256 return None;
257 }
258
259 let mut state = state.lock().expect("Cannot lock state");
260 let upserts = response.upserts;
261 let deletes = response.deletes;
262 let seq = response.sequence;
263
264 if seq == 0 {
265 println!("Clearing {} state", query_id.clone());
266 state.clear();
267 }
268
269 let upserts: Vec<T> = upserts
270 .iter()
271 .filter_map(|x| serde_json::from_value::<T>(x.item.clone()).ok())
272 .collect();
273
274 for up in upserts.iter() {
275 let _len = state.len();
276 state.insert(up.id().clone(), up.clone());
277 }
278
279 for del in deletes.iter() {
280 let _len = state.len();
281 state.remove(del);
282 }
283
284 Some(state.values().cloned().collect::<Vec<T>>())
285 }
286 _ => None,
287 };
288
289 vec
290 });
291
292 if wrapped.is_err() {
293 eprint!("Could not wrap query: {:?}", wrapped.err());
294 return stream;
295 }
296
297 let wrapped = wrapped.unwrap();
298
299 let msg = MykoMessage::<()>::Query(wrapped);
300
301 let msg = Message::Text(serde_json::to_string(&msg).expect("Could not serialize message"));
302
303 let query_send_socket = self.socket.clone();
304 let query_send_self = self.clone();
305
306 tokio::spawn(async move {
307 let mut stream = query_send_self.watch_connection_status();
308 while let Some(status) = stream.next().await {
309 match status {
310 ConnectionStatus::Connected(_) => {
311 match query_send_socket.outgoing.send(msg.clone()) {
312 Ok(_) => {
313 println!("Watching query {send_query_id}");
314 }
315 Err(e) => {
316 println!("Could not send message to ws: {e:?}");
317 }
318 }
319 }
320 ConnectionStatus::Disconnected => {
321 println!("Query {send_query_id} Disconnected");
322 }
323 }
324 }
325 });
326
327 stream
328 }
329}