perspective_client/session.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::sync::Arc;
14
15use futures::Future;
16use prost::Message;
17
18use crate::proto::request::ClientReq;
19use crate::proto::{Request, Response};
20use crate::{Client, ClientError, asyncfn};
21#[cfg(doc)]
22use crate::{Table, View};
23
24/// The server-side representation of a connection to a [`Client`].
25///
26/// For each [`Client`] that wants to connect to a `perspective_server::Server`,
27/// a dedicated [`Session`] must be created. The [`Session`] handles routing
28/// messages emitted by the `perspective_server::Server`ve_server::Server`, as
29/// well as owning any resources the [`Client`] may request.
30pub trait Session<E> {
31 /// Handle an incoming request from the [`Client`]. Calling
32 /// [`Session::handle_request`] will result in the `send_response` parameter
33 /// which was used to construct this [`Session`] to fire one or more times.
34 ///
35 /// ```text
36 /// :
37 /// Client : Session
38 /// ┏━━━━━━━━━━━━━━━━━━┓ : ┏━━━━━━━━━━━━━━━━━━━━┓
39 /// ┃ send_request ┃━━━>┃ handle_request (*) ┃
40 /// ┃ .. ┃ : ┃ .. ┃
41 /// ┗━━━━━━━━━━━━━━━━━━┛ : ┗━━━━━━━━━━━━━━━━━━━━┛
42 /// :
43 /// ```
44 ///
45 /// # Arguments
46 ///
47 /// - `request` An incoming request message, generated from a
48 /// [`Client::new`]'s `send_request` handler (which may-or-may-not be
49 /// local).
50 fn handle_request(&self, request: &[u8]) -> impl Future<Output = Result<(), E>>;
51
52 /// Flush any pending messages which may have resulted from previous
53 /// [`Session::handle_request`] calls. Calling [`Session::poll`] may result
54 /// in the `send_response` parameter which was used to construct this (or
55 /// other) [`Session`] to fire. Whenever a [`Session::handle_request`]
56 /// method is invoked for a `perspective_server::Server`, at least one
57 /// [`Session::poll`] should be scheduled to clear other clients message
58 /// queues.
59 ///
60 /// ```text
61 /// :
62 /// Client : Session Server
63 /// ┏━━━━━━━━━━━━━━━━━━┓ : ┏━━━━━━━━━━━━━━━━━━━┓
64 /// ┃ send_request ┃━┳━>┃ handle_request ┃ ┏━━━━━━━━━━━━━━━━━━━┓
65 /// ┃ .. ┃ ┗━>┃ poll (*) ┃━━━>┃ poll (*) ┃
66 /// ┗━━━━━━━━━━━━━━━━━━┛ : ┃ .. ┃ ┃ .. ┃
67 /// : ┗━━━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━┛
68 /// ```
69 fn poll(&self) -> impl Future<Output = Result<(), E>>;
70
71 /// Close this [`Session`], cleaning up any callbacks (e.g. arguments
72 /// provided to [`Session::handle_request`]) and resources (e.g. views
73 /// returned by a call to [`Table::view`]).
74 ///
75 /// Dropping a [`Session`] outside of the context of [`Session::close`]
76 /// will cause a [`tracing`] error-level log to be emitted, but won't fail.
77 /// They will, however, leak.
78 fn close(self) -> impl Future<Output = ()>;
79}
80
81type ProxyCallback =
82 Arc<dyn Fn(&[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync>;
83
84/// A [`Session`] implementation which tunnels through another [`Client`].
85#[derive(Clone)]
86pub struct ProxySession {
87 parent: Client,
88 callback: ProxyCallback,
89}
90
91impl ProxySession {
92 pub fn new(
93 client: Client,
94 send_response: impl Fn(&[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
95 + Send
96 + Sync
97 + 'static,
98 ) -> Self {
99 ProxySession {
100 parent: client,
101 callback: Arc::new(send_response),
102 }
103 }
104}
105
106fn encode(response: Response, callback: ProxyCallback) -> Result<(), ClientError> {
107 let mut enc = vec![];
108 response.encode(&mut enc)?;
109 callback(&enc).map_err(|x| ClientError::Unknown(x.to_string()))?;
110 Ok(())
111}
112
113impl Session<ClientError> for ProxySession {
114 async fn handle_request(&self, request: &[u8]) -> Result<(), ClientError> {
115 let req = Request::decode(request)?;
116 let callback = self.callback.clone();
117 match req.client_req.as_ref() {
118 Some(ClientReq::ViewOnUpdateReq(_)) => {
119 let on_update =
120 asyncfn!(callback, async move |response| encode(response, callback));
121 self.parent.subscribe(&req, on_update).await?
122 },
123 Some(_) => {
124 let on_update = move |response| encode(response, callback);
125 self.parent
126 .subscribe_once(&req, Box::new(on_update))
127 .await?
128 },
129 None => {
130 return Err(ClientError::Internal(
131 "ProxySession::handle_request: invalid request".to_string(),
132 ));
133 },
134 };
135
136 Ok(())
137 }
138
139 async fn poll(&self) -> Result<(), ClientError> {
140 Ok(())
141 }
142
143 async fn close(self) {}
144}