1use super::registry::REGISTRY;
2use super::wire::{WireAction, WireContext, WireEnvelope, WireTask};
3use anyhow::Error;
4use rill_protocol::io::client::{
5 AccessLevel, ClientProtocol, ClientReqId, ClientRequest, ClientResponse, ClientServiceRequest,
6 ClientServiceResponse,
7};
8use rill_protocol::io::transport::{Envelope, ServiceEnvelope};
9use std::collections::HashMap;
10use std::time::Duration;
11use thiserror::Error;
12use url::Url;
13use yew::services::timeout::{TimeoutService, TimeoutTask};
14use yew::services::websocket::{WebSocketService, WebSocketStatus, WebSocketTask};
15use yew::worker::{Agent, AgentLink, Context, HandlerId};
16
17struct WireRuntime {
18 who: HandlerId,
19 req_id: ClientReqId,
20 task: Box<dyn WireTask>,
21}
22
23impl WireRuntime {
24 fn wire_action(&mut self, action: WireAction, link: &mut AgentLink<LiveAgent>) {
25 let context = WireContext {
26 who: self.who,
27 req_id: self.req_id,
28 link,
29 };
30 self.task.on_action(action, context);
31 }
32}
33
34pub enum Msg {
35 WsIncoming(
36 Result<ServiceEnvelope<ClientProtocol, ClientResponse, ClientServiceRequest>, Error>,
37 ),
38 WsStatus(WebSocketStatus),
39 TryReconnect,
40 }
42
43#[derive(Debug)]
44pub enum LiveRequest {
45 Wire(Box<dyn WireTask>),
47 TerminateWire,
49
50 Forward(ClientRequest),
51 DetachRuntime,
52}
53
54#[derive(Debug, Clone)]
55pub enum LiveResponse {
56 Forwarded(ClientResponse),
57 WireDone,
58}
59
60#[derive(Debug, Clone)]
61pub enum LiveStatus {
62 Disconnected,
63 Connected,
64 AccessLevel(AccessLevel),
65}
66
67impl LiveStatus {
68 fn is_connected(&self) -> bool {
69 !matches!(self, Self::Disconnected)
70 }
71}
72
73pub struct LiveAgent {
74 link: AgentLink<Self>,
75 status: LiveStatus,
76 ws: Option<WebSocketTask>,
77 wires: HashMap<ClientReqId, WireRuntime>,
78 reconnection_task: Option<TimeoutTask>,
79}
80
81impl Agent for LiveAgent {
82 type Reach = Context<Self>;
83 type Message = Msg;
84 type Input = WireEnvelope<ClientReqId, LiveRequest>;
85 type Output = WireEnvelope<ClientReqId, LiveResponse>;
86
87 fn create(link: AgentLink<Self>) -> Self {
88 let mut this = Self {
89 link,
90 status: LiveStatus::Disconnected,
91 ws: None,
92 wires: HashMap::new(),
93 reconnection_task: None,
94 };
95 if let Err(err) = this.connect() {
96 log::error!("Can't start conencting because of: {}", err);
97 }
98 this
99 }
100
101 fn update(&mut self, msg: Self::Message) {
102 match msg {
103 Msg::WsIncoming(Ok(response)) => {
104 match response {
106 ServiceEnvelope::Envelope(envelope) => {
107 let direct_id = envelope.direct_id;
108 let action = WireAction::Incoming(envelope.data);
109 let runtime = self.wires.get_mut(&direct_id);
110 if let Some(runtime) = runtime {
111 runtime.wire_action(action, &mut self.link);
112 }
113 }
114 ServiceEnvelope::Service(service) => match service {
115 ClientServiceRequest::AccessLevel(access_level) => {
123 log::info!("ACCESS LEVEL: {:?}", access_level);
124 self.status = LiveStatus::AccessLevel(access_level);
125 self.status_to_wires(self.status.clone());
126 }
127 },
128 }
129 }
130 Msg::WsIncoming(Err(err)) => {
131 log::error!("Invalid incoiming message: {}", err);
132 }
133 Msg::WsStatus(status) => {
134 match status {
135 WebSocketStatus::Opened => {
136 log::info!("CONNECTED!");
137 self.status = LiveStatus::Connected;
138 }
139 WebSocketStatus::Closed | WebSocketStatus::Error => {
140 log::info!("DISCONNECTED!");
141 self.status = LiveStatus::Disconnected;
142 self.ws.take();
143 let duration = Duration::from_secs(5);
144 let callback = self.link.callback(|_| Msg::TryReconnect);
145 let task = TimeoutService::spawn(duration, callback);
146 self.reconnection_task = Some(task);
147 }
148 }
149 self.status_to_wires(self.status.clone());
150 }
151 Msg::TryReconnect => {
152 self.reconnection_task.take();
153 if let Err(err) = self.connect() {
154 log::error!("Can't reconnect because of: {}", err);
155 }
156 }
157 }
158 }
159
160 fn handle_input(&mut self, request: Self::Input, who: HandlerId) {
161 let id = request.id;
163 match request.data {
164 LiveRequest::Wire(task) => {
165 let mut runtime = WireRuntime {
166 who,
167 req_id: id,
168 task,
169 };
170 if self.status.is_connected() {
172 let action = WireAction::Status(self.status.clone());
177 runtime.wire_action(action, &mut self.link);
178 }
179 self.wires.insert(id, runtime);
180 }
181 LiveRequest::TerminateWire => {
182 if let Some(runtime) = self.wires.get_mut(&id) {
183 let action = WireAction::Interrupted;
184 runtime.wire_action(action, &mut self.link);
185 }
186 }
187 LiveRequest::Forward(request) => {
188 let envelope = Envelope {
189 direct_id: id,
190 data: request,
191 };
192 let service_envelope = ServiceEnvelope::Envelope(envelope);
193 self.send_service_envelope(service_envelope);
194 }
195 LiveRequest::DetachRuntime => {
196 if let Some(runtime) = self.wires.remove(&id) {
197 let output = LiveResponse::WireDone;
198 let envelope = WireEnvelope::new(runtime.req_id, output);
199 self.link.respond(runtime.who, envelope);
200 REGISTRY.release(id);
202 } else {
203 log::error!("Can't detach a runtime with id {:?}", id);
204 }
205 }
206 }
207 }
208
209 fn connected(&mut self, _id: HandlerId) {
210 }
212
213 fn disconnected(&mut self, _id: HandlerId) {
214 }
216}
217
218#[derive(Error, Debug)]
219enum ConnectorError {
220 #[error("can't get window object")]
221 NoWindow,
222 #[error("can't convert location to a string")]
223 NoString,
224}
225
226impl LiveAgent {
227 fn status_to_wires(&mut self, live_status: LiveStatus) {
228 for runtime in self.wires.values_mut() {
229 let action = WireAction::Status(live_status.clone());
230 runtime.wire_action(action, &mut self.link);
231 }
232 }
233
234 fn send_service_envelope(
235 &mut self,
236 service_envelope: ServiceEnvelope<ClientProtocol, ClientRequest, ClientServiceResponse>,
237 ) {
238 if let Some(ws) = self.ws.as_mut() {
239 if self.status.is_connected() {
240 let data = rill_protocol::encoding::to_vec(&service_envelope);
242 ws.send_binary(data);
243 } else {
244 log::error!(
245 "Connection not established yet for sending: {:?}",
246 service_envelope
247 );
248 }
249 } else {
250 log::error!("No connection to send: {:?}", service_envelope);
252 }
253 }
254
255 fn connect(&mut self) -> Result<(), Error> {
256 let mut url: Url = web_sys::window()
257 .ok_or(ConnectorError::NoWindow)?
258 .location()
259 .to_string()
260 .as_string()
261 .ok_or(ConnectorError::NoString)?
262 .parse()?;
263 let scheme = if url.scheme().ends_with('s') {
264 "wss"
265 } else {
266 "ws"
267 };
268 url.set_scheme(scheme)
269 .map_err(|_| Error::msg("Can't set scheme"))?;
270 url.set_path("/live/client");
271 let url = url.to_string();
272 log::info!("Location: {}", url);
273 let callback = self.link.callback(|data: Result<Vec<u8>, Error>| {
274 let res = data.and_then(|data| rill_protocol::encoding::from_slice(&data));
275 Msg::WsIncoming(res)
276 });
277 let notification = self.link.callback(Msg::WsStatus);
278 let ws = WebSocketService::connect_binary(&url, callback, notification)
279 .map_err(|reason| Error::msg(reason.to_string()))?;
280 self.ws = Some(ws);
281 Ok(())
282 }
283}