stream_deck_plugin/
client.rs

1use futures_util::SinkExt;
2use crate::ClientError;
3use crate::ClientRequest;
4use crate::client_request::SetImagePayload;
5use crate::client_request::SetStatePayload;
6use crate::client_request::SetTitlePayload;
7use crate::ClientResponse;
8use futures_util::StreamExt;
9
10use color_eyre::Result;
11use std::env;
12use tokio::sync::mpsc;
13use tokio_tungstenite::connect_async;
14use tungstenite::Message;
15use url::Url;
16
17
18#[derive(Debug)]
19enum Command {
20    Request(ClientRequest),
21}
22
23#[derive(Debug)]
24pub struct Client {
25    rx: mpsc::Receiver<ClientResponse>,
26    tx: mpsc::Sender<Command>,
27}
28
29impl Client {
30    pub async fn new_from_args() -> Result<Self> {
31        let mut args = env::args();
32
33        args.next().expect("Binary should have name"); // skip binary name
34
35        let mut port = None;
36        let mut plugin_uuid = None;
37        let mut register_event = None;
38        let mut info = None;
39
40        while let Some(a) = args.next() {
41            match a.as_str() {
42                "-port" => {
43                    let p = args.next().ok_or_else(|| ClientError::MissingValue {
44                        param: String::from("port"),
45                    })?;
46                    let p = p.parse::<u16>().map_err(|_e| ClientError::InvalidValue {
47                        param: String::from("port"),
48                        value: String::from(p),
49                    })?;
50                    port = Some(p);
51
52                    tracing::info!("Port {port:?}\n");
53                }
54                "-pluginUUID" => {
55                    let v = args.next().ok_or_else(|| ClientError::MissingValue {
56                        param: String::from("pluginUUID"),
57                    })?;
58                    plugin_uuid = Some(v);
59                    tracing::info!("plugin_uuid {plugin_uuid:?}");
60                }
61                "-registerEvent" => {
62                    let v = args.next().ok_or_else(|| ClientError::MissingValue {
63                        param: String::from("registerEvent"),
64                    })?;
65                    register_event = Some(v);
66
67                    tracing::info!("register_event {register_event:?}");
68                }
69                "-info" => {
70                    let v = args.next().ok_or_else(|| ClientError::MissingValue {
71                        param: String::from("info"),
72                    })?;
73                    info = Some(v);
74                    tracing::info!("info {info:?}");
75                }
76                o => {
77                    tracing::info!("Unhandled argument {o}");
78                }
79            }
80        }
81        if port.is_none() {
82            return Err(ClientError::MissingParameter {
83                param: String::from("port"),
84            }
85            .into());
86        }
87        let port = port.unwrap();
88        if plugin_uuid.is_none() {
89            return Err(ClientError::MissingParameter {
90                param: String::from("pluginUUID"),
91            }
92            .into());
93        }
94        let plugin_uuid = plugin_uuid.unwrap();
95        if register_event.is_none() {
96            return Err(ClientError::MissingParameter {
97                param: String::from("registerEvent"),
98            }
99            .into());
100        }
101        let register_event = register_event.unwrap();
102        if register_event != "registerPlugin" {
103            return Err(ClientError::Severe {
104                msg: format!("Unexpected registerEvent '{register_event}'"),
105            }
106            .into());            
107        }
108        if info.is_none() {
109            return Err(ClientError::MissingParameter {
110                param: String::from("info"),
111            }
112            .into());
113        }
114        let _info = info.unwrap();
115
116        let url = format!("ws://localhost:{port}/");
117        let url = Url::parse(&url)?;
118
119        /*
120        let (mut socket, response) =
121            connect(url).expect("Can't connect");
122        */
123        let (socket, response) = connect_async(url).await.expect("Can connect");
124
125        tracing::info!("Connected to the server");
126
127        tracing::info!("Response HTTP code: {}", response.status());
128
129        tracing::info!("Response contains the following headers:");
130        for (ref header, _value) in response.headers() {
131            tracing::info!("* {}", header);
132        }
133
134        let (tx, rx) = mpsc::channel::<ClientResponse>(16);
135
136        let (mut write, read) = socket.split();
137        let register = ClientRequest::RegisterPlugin {
138            uuid: plugin_uuid.clone(),
139        };
140
141        let j = serde_json::to_string(&register)?;
142
143        write.send(j.into()).await?;
144
145
146        tokio::spawn(async move {
147            read.for_each(|m| async {
148                match m {
149                    Ok(msg) => {
150                        match msg {
151                            Message::Text(ref txt) => {
152                                match serde_json::from_str::<ClientResponse>(&txt) {
153                                    Ok(r) => {
154                                        // Note: we could do custom handling here ...
155                                        /*
156                                        match r {
157                                            PluginResponse::WillAppear { .. } => {
158                                                tracing::info!("WillAppear {r:?}");
159                                                let _ = tx.send( r ).await;
160                                            }
161                                            o => {
162                                                tracing::warn!("Unhandled {o:?}");
163                                            }
164                                        }
165                                        */
166                                        // ... or just blindly route
167                                        let _ = tx.send(r).await;
168                                    }
169                                    Err(e) => {
170                                        tracing::info!("ERROR! Failed parsing: {msg} -> {e:?}");
171                                    }
172                                }
173                            }
174                            m => {
175                                tracing::info!("WARN! Unhandled: {m}");
176                            }
177                        }
178                    }
179                    Err(e) => {
180                        tracing::info!("ERROR! Error reading message: {e:?}");
181                    }
182                }
183            })
184            .await;
185        });
186
187        let (tx, mut cmd_rx) = mpsc::channel::<Command>(16);
188        tokio::spawn(async move {
189            loop {
190                let timeout = tokio::time::sleep(tokio::time::Duration::from_millis(2000));
191                tokio::select! {
192                    _ = timeout => {
193                        // tracing::info!("Heartbeat");
194
195                        // for future extensions
196                    }
197                    Some( cmd ) = cmd_rx.recv() => {
198                        match cmd {
199                            Command::Request( request ) => {
200                                    match serde_json::to_string(&request) {
201                                        Ok( j ) => {
202                                            //tracing::info!("Sending {j:?}");
203                                            let _ = write.send(j.into()).await;
204                                        },
205                                        Err(_) => todo!()
206                                    }
207                            }
208                        }
209                    }
210                }
211            }
212        });
213
214        Ok(Self { rx, tx })
215    }
216
217    pub async fn recv(&mut self) -> Option<ClientResponse> {
218        self.rx.recv().await
219    }
220
221    pub async fn send(&self, request: ClientRequest) -> Result<()> {
222        let _ = self.tx.send(Command::Request(request)).await?;
223        Ok(())
224    }
225
226    pub async fn send_set_title(
227        &self,
228        context: String,
229        title: String,
230        target: String,
231        state: usize,
232    ) -> Result<()> {
233        self.send(ClientRequest::SetTitle {
234            context,
235            payload: SetTitlePayload {
236                title,
237                target,
238                state,
239            },
240        })
241        .await
242    }
243    pub async fn send_set_state(&self, context: String, state: usize) -> Result<()> {
244        self.send(ClientRequest::SetState {
245            context,
246            payload: SetStatePayload { state },
247        })
248        .await
249    }
250    pub async fn send_set_image(&self, context: String, image: String, state: usize) -> Result<()> {
251        self.send(ClientRequest::SetImage {
252            context,
253            payload: SetImagePayload { image, state },
254        })
255        .await
256    }
257}