perspective_client/session.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error as StdError;
14use std::sync::Arc;
15
16use futures::Future;
17use prost::Message;
18
19use crate::proto::request::ClientReq;
20use crate::proto::{Request, Response};
21use crate::{Client, ClientError, asyncfn};
22#[cfg(doc)]
23use crate::{Table, View};
24
25/// The server-side representation of a connection to a [`Client`].
26///
27/// For each [`Client`] that wants to connect to a `perspective_server::Server`,
28/// a dedicated [`Session`] must be created. The [`Session`] handles routing
29/// messages emitted by the `perspective_server::Server`ve_server::Server`, as
30/// well as owning any resources the [`Client`] may request.
31pub trait Session<E> {
32 /// Handle an incoming request from the [`Client`]. Calling
33 /// [`Session::handle_request`] will result in the `send_response` parameter
34 /// which was used to construct this [`Session`] to fire one or more times.
35 ///
36 /// ```text
37 /// :
38 /// Client : Session
39 /// ┏━━━━━━━━━━━━━━━━━━┓ : ┏━━━━━━━━━━━━━━━━━━━━┓
40 /// ┃ send_request ┃━━━>┃ handle_request (*) ┃
41 /// ┃ .. ┃ : ┃ .. ┃
42 /// ┗━━━━━━━━━━━━━━━━━━┛ : ┗━━━━━━━━━━━━━━━━━━━━┛
43 /// :
44 /// ```
45 ///
46 /// # Arguments
47 ///
48 /// - `request` An incoming request message, generated from a
49 /// [`Client::new`]'s `send_request` handler (which may-or-may-not be
50 /// local).
51 fn handle_request(&self, request: &[u8]) -> impl Future<Output = Result<(), E>>;
52
53 /// Flush any pending messages which may have resulted from previous
54 /// [`Session::handle_request`] calls. Calling [`Session::poll`] may result
55 /// in the `send_response` parameter which was used to construct this (or
56 /// other) [`Session`] to fire. Whenever a [`Session::handle_request`]
57 /// method is invoked for a `perspective_server::Server`, at least one
58 /// [`Session::poll`] should be scheduled to clear other clients message
59 /// queues.
60 ///
61 /// ```text
62 /// :
63 /// Client : Session Server
64 /// ┏━━━━━━━━━━━━━━━━━━┓ : ┏━━━━━━━━━━━━━━━━━━━┓
65 /// ┃ send_request ┃━┳━>┃ handle_request ┃ ┏━━━━━━━━━━━━━━━━━━━┓
66 /// ┃ .. ┃ ┗━>┃ poll (*) ┃━━━>┃ poll (*) ┃
67 /// ┗━━━━━━━━━━━━━━━━━━┛ : ┃ .. ┃ ┃ .. ┃
68 /// : ┗━━━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━┛
69 /// ```
70 fn poll(&self) -> impl Future<Output = Result<(), E>>;
71
72 /// Close this [`Session`], cleaning up any callbacks (e.g. arguments
73 /// provided to [`Session::handle_request`]) and resources (e.g. views
74 /// returned by a call to [`Table::view`]).
75 ///
76 /// Dropping a [`Session`] outside of the context of [`Session::close`]
77 /// will cause a [`tracing`] error-level log to be emitted, but won't fail.
78 /// They will, however, leak.
79 fn close(self) -> impl Future<Output = ()>;
80}
81
82type ProxyCallbackError = Box<dyn StdError + Send + Sync>;
83type ProxyCallback = Arc<dyn Fn(&[u8]) -> Result<(), ProxyCallbackError> + Send + Sync>;
84
85/// A [`Session`] implementation which tunnels through another [`Client`].
86#[derive(Clone)]
87pub struct ProxySession {
88 parent: Client,
89 callback: ProxyCallback,
90}
91
92impl ProxySession {
93 pub fn new(
94 client: Client,
95 send_response: impl Fn(&[u8]) -> Result<(), ProxyCallbackError> + Send + Sync + 'static,
96 ) -> Self {
97 ProxySession {
98 parent: client,
99 callback: Arc::new(send_response),
100 }
101 }
102}
103
104fn encode(response: Response, callback: ProxyCallback) -> Result<(), ClientError> {
105 let mut enc = vec![];
106 response.encode(&mut enc)?;
107 callback(&enc).map_err(|x| ClientError::Unknown(x.to_string()))?;
108 Ok(())
109}
110
111impl Session<ClientError> for ProxySession {
112 async fn handle_request(&self, request: &[u8]) -> Result<(), ClientError> {
113 let req = Request::decode(request)?;
114 let callback = self.callback.clone();
115 match req.client_req.as_ref() {
116 Some(ClientReq::ViewOnUpdateReq(_)) => {
117 let on_update =
118 asyncfn!(callback, async move |response| encode(response, callback));
119 self.parent.subscribe(&req, on_update).await?
120 },
121 Some(_) => {
122 let on_update = move |response| encode(response, callback);
123 self.parent
124 .subscribe_once(&req, Box::new(on_update))
125 .await?
126 },
127 None => {
128 return Err(ClientError::Internal(
129 "ProxySession::handle_request: invalid request".to_string(),
130 ));
131 },
132 };
133
134 Ok(())
135 }
136
137 async fn poll(&self) -> Result<(), ClientError> {
138 Ok(())
139 }
140
141 async fn close(self) {}
142}