rate_ui/agents/live/
agent.rs

1use super::registry::REGISTRY;
2use super::wire::{WireAction, WireContext, WireEnvelope, WireTask};
3use anyhow::Error;
4use rill_protocol::io::client::{
5    AccessLevel, ClientProtocol, ClientReqId, ClientRequest, ClientResponse, ClientServiceRequest,
6    ClientServiceResponse,
7};
8use rill_protocol::io::transport::{Envelope, ServiceEnvelope};
9use std::collections::HashMap;
10use std::time::Duration;
11use thiserror::Error;
12use url::Url;
13use yew::services::timeout::{TimeoutService, TimeoutTask};
14use yew::services::websocket::{WebSocketService, WebSocketStatus, WebSocketTask};
15use yew::worker::{Agent, AgentLink, Context, HandlerId};
16
17struct WireRuntime {
18    who: HandlerId,
19    req_id: ClientReqId,
20    task: Box<dyn WireTask>,
21}
22
23impl WireRuntime {
24    fn wire_action(&mut self, action: WireAction, link: &mut AgentLink<LiveAgent>) {
25        let context = WireContext {
26            who: self.who,
27            req_id: self.req_id,
28            link,
29        };
30        self.task.on_action(action, context);
31    }
32}
33
34pub enum Msg {
35    WsIncoming(
36        Result<ServiceEnvelope<ClientProtocol, ClientResponse, ClientServiceRequest>, Error>,
37    ),
38    WsStatus(WebSocketStatus),
39    TryReconnect,
40    // Don't add to many variants here
41}
42
43#[derive(Debug)]
44pub enum LiveRequest {
45    /// Created a new wire
46    Wire(Box<dyn WireTask>),
47    /// Interrupts a wire
48    TerminateWire,
49
50    Forward(ClientRequest),
51    DetachRuntime,
52}
53
54#[derive(Debug, Clone)]
55pub enum LiveResponse {
56    Forwarded(ClientResponse),
57    WireDone,
58}
59
60#[derive(Debug, Clone)]
61pub enum LiveStatus {
62    Disconnected,
63    Connected,
64    AccessLevel(AccessLevel),
65}
66
67impl LiveStatus {
68    fn is_connected(&self) -> bool {
69        !matches!(self, Self::Disconnected)
70    }
71}
72
73pub struct LiveAgent {
74    link: AgentLink<Self>,
75    status: LiveStatus,
76    ws: Option<WebSocketTask>,
77    wires: HashMap<ClientReqId, WireRuntime>,
78    reconnection_task: Option<TimeoutTask>,
79}
80
81impl Agent for LiveAgent {
82    type Reach = Context<Self>;
83    type Message = Msg;
84    type Input = WireEnvelope<ClientReqId, LiveRequest>;
85    type Output = WireEnvelope<ClientReqId, LiveResponse>;
86
87    fn create(link: AgentLink<Self>) -> Self {
88        let mut this = Self {
89            link,
90            status: LiveStatus::Disconnected,
91            ws: None,
92            wires: HashMap::new(),
93            reconnection_task: None,
94        };
95        if let Err(err) = this.connect() {
96            log::error!("Can't start conencting because of: {}", err);
97        }
98        this
99    }
100
101    fn update(&mut self, msg: Self::Message) {
102        match msg {
103            Msg::WsIncoming(Ok(response)) => {
104                //log::trace!("WS-RECV: {:?}", response);
105                match response {
106                    ServiceEnvelope::Envelope(envelope) => {
107                        let direct_id = envelope.direct_id;
108                        let action = WireAction::Incoming(envelope.data);
109                        let runtime = self.wires.get_mut(&direct_id);
110                        if let Some(runtime) = runtime {
111                            runtime.wire_action(action, &mut self.link);
112                        }
113                    }
114                    ServiceEnvelope::Service(service) => match service {
115                        /*
116                        ClientServiceRequest::Ping => {
117                            let response = ClientServiceResponse::Pong;
118                            let service_envelope = ServiceEnvelope::Service(response);
119                            self.send_service_envelope(service_envelope);
120                        }
121                        */
122                        ClientServiceRequest::AccessLevel(access_level) => {
123                            log::info!("ACCESS LEVEL: {:?}", access_level);
124                            self.status = LiveStatus::AccessLevel(access_level);
125                            self.status_to_wires(self.status.clone());
126                        }
127                    },
128                }
129            }
130            Msg::WsIncoming(Err(err)) => {
131                log::error!("Invalid incoiming message: {}", err);
132            }
133            Msg::WsStatus(status) => {
134                match status {
135                    WebSocketStatus::Opened => {
136                        log::info!("CONNECTED!");
137                        self.status = LiveStatus::Connected;
138                    }
139                    WebSocketStatus::Closed | WebSocketStatus::Error => {
140                        log::info!("DISCONNECTED!");
141                        self.status = LiveStatus::Disconnected;
142                        self.ws.take();
143                        let duration = Duration::from_secs(5);
144                        let callback = self.link.callback(|_| Msg::TryReconnect);
145                        let task = TimeoutService::spawn(duration, callback);
146                        self.reconnection_task = Some(task);
147                    }
148                }
149                self.status_to_wires(self.status.clone());
150            }
151            Msg::TryReconnect => {
152                self.reconnection_task.take();
153                if let Err(err) = self.connect() {
154                    log::error!("Can't reconnect because of: {}", err);
155                }
156            }
157        }
158    }
159
160    fn handle_input(&mut self, request: Self::Input, who: HandlerId) {
161        //log::trace!("LiveAgent request {:?} from {:?}", request, who);
162        let id = request.id;
163        match request.data {
164            LiveRequest::Wire(task) => {
165                let mut runtime = WireRuntime {
166                    who,
167                    req_id: id,
168                    task,
169                };
170                // TODO: Do I have to send desconnected status in any case?
171                if self.status.is_connected() {
172                    /*
173                    let action = WireAction::Status(LiveStatus::Connected);
174                    runtime.wire_action(action, &mut self.link);
175                    */
176                    let action = WireAction::Status(self.status.clone());
177                    runtime.wire_action(action, &mut self.link);
178                }
179                self.wires.insert(id, runtime);
180            }
181            LiveRequest::TerminateWire => {
182                if let Some(runtime) = self.wires.get_mut(&id) {
183                    let action = WireAction::Interrupted;
184                    runtime.wire_action(action, &mut self.link);
185                }
186            }
187            LiveRequest::Forward(request) => {
188                let envelope = Envelope {
189                    direct_id: id,
190                    data: request,
191                };
192                let service_envelope = ServiceEnvelope::Envelope(envelope);
193                self.send_service_envelope(service_envelope);
194            }
195            LiveRequest::DetachRuntime => {
196                if let Some(runtime) = self.wires.remove(&id) {
197                    let output = LiveResponse::WireDone;
198                    let envelope = WireEnvelope::new(runtime.req_id, output);
199                    self.link.respond(runtime.who, envelope);
200                    // Nothing will be forwarded since the runtime has removed
201                    REGISTRY.release(id);
202                } else {
203                    log::error!("Can't detach a runtime with id {:?}", id);
204                }
205            }
206        }
207    }
208
209    fn connected(&mut self, _id: HandlerId) {
210        //log::trace!("Connected to Live: {:?}", id);
211    }
212
213    fn disconnected(&mut self, _id: HandlerId) {
214        //log::trace!("Disconnected from Live: {:?}", id);
215    }
216}
217
218#[derive(Error, Debug)]
219enum ConnectorError {
220    #[error("can't get window object")]
221    NoWindow,
222    #[error("can't convert location to a string")]
223    NoString,
224}
225
226impl LiveAgent {
227    fn status_to_wires(&mut self, live_status: LiveStatus) {
228        for runtime in self.wires.values_mut() {
229            let action = WireAction::Status(live_status.clone());
230            runtime.wire_action(action, &mut self.link);
231        }
232    }
233
234    fn send_service_envelope(
235        &mut self,
236        service_envelope: ServiceEnvelope<ClientProtocol, ClientRequest, ClientServiceResponse>,
237    ) {
238        if let Some(ws) = self.ws.as_mut() {
239            if self.status.is_connected() {
240                //log::trace!("WS-SEND: {:?}", service_envelope);
241                let data = rill_protocol::encoding::to_vec(&service_envelope);
242                ws.send_binary(data);
243            } else {
244                log::error!(
245                    "Connection not established yet for sending: {:?}",
246                    service_envelope
247                );
248            }
249        } else {
250            // TODO: Add and use tasks queue
251            log::error!("No connection to send: {:?}", service_envelope);
252        }
253    }
254
255    fn connect(&mut self) -> Result<(), Error> {
256        let mut url: Url = web_sys::window()
257            .ok_or(ConnectorError::NoWindow)?
258            .location()
259            .to_string()
260            .as_string()
261            .ok_or(ConnectorError::NoString)?
262            .parse()?;
263        let scheme = if url.scheme().ends_with('s') {
264            "wss"
265        } else {
266            "ws"
267        };
268        url.set_scheme(scheme)
269            .map_err(|_| Error::msg("Can't set scheme"))?;
270        url.set_path("/live/client");
271        let url = url.to_string();
272        log::info!("Location: {}", url);
273        let callback = self.link.callback(|data: Result<Vec<u8>, Error>| {
274            let res = data.and_then(|data| rill_protocol::encoding::from_slice(&data));
275            Msg::WsIncoming(res)
276        });
277        let notification = self.link.callback(Msg::WsStatus);
278        let ws = WebSocketService::connect_binary(&url, callback, notification)
279            .map_err(|reason| Error::msg(reason.to_string()))?;
280        self.ws = Some(ws);
281        Ok(())
282    }
283}