perspective_client/
session.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::pin::Pin;
14use std::sync::Arc;
15
16use futures::Future;
17use prost::Message;
18
19use crate::proto::request::ClientReq;
20use crate::proto::{Request, Response};
21use crate::{Client, ClientError};
22#[cfg(doc)]
23use crate::{Table, View};
24
25/// The server-side representation of a connection to a [`Client`].
26///
27/// For each [`Client`] that wants to connect to a `perspective_server::Server`,
28/// a dedicated [`Session`] must be created. The [`Session`] handles routing
29/// messages emitted by the `perspective_server::Server`ve_server::Server`, as
30/// well as owning any resources the [`Client`] may request.
31pub trait Session<E> {
32    /// Handle an incoming request from the [`Client`]. Calling
33    /// [`Session::handle_request`] will result in the `send_response` parameter
34    /// which was used to construct this [`Session`] to fire one or more times.
35    ///
36    /// ```text
37    ///                      :
38    ///  Client              :   Session
39    /// ┏━━━━━━━━━━━━━━━━━━┓ :  ┏━━━━━━━━━━━━━━━━━━━━┓
40    /// ┃ send_request     ┃━━━>┃ handle_request (*) ┃
41    /// ┃ ..               ┃ :  ┃ ..                 ┃
42    /// ┗━━━━━━━━━━━━━━━━━━┛ :  ┗━━━━━━━━━━━━━━━━━━━━┛
43    ///                      :
44    /// ```
45    ///
46    /// # Arguments
47    ///
48    /// - `request` An incoming request message, generated from a
49    ///   [`Client::new`]'s `send_request` handler (which may-or-may-not be
50    ///   local).
51    fn handle_request(&self, request: &[u8]) -> impl Future<Output = Result<(), E>>;
52
53    /// Flush any pending messages which may have resulted from previous
54    /// [`Session::handle_request`] calls. Calling [`Session::poll`] may result
55    /// in the `send_response` parameter which was used to construct this (or
56    /// other) [`Session`] to fire. Whenever a [`Session::handle_request`]
57    /// method is invoked for a `perspective_server::Server`, at least one
58    /// [`Session::poll`] should be scheduled to clear other clients message
59    /// queues.
60    ///
61    /// ```text
62    ///                      :
63    ///  Client              :   Session                  Server
64    /// ┏━━━━━━━━━━━━━━━━━━┓ :  ┏━━━━━━━━━━━━━━━━━━━┓
65    /// ┃ send_request     ┃━┳━>┃ handle_request    ┃    ┏━━━━━━━━━━━━━━━━━━━┓
66    /// ┃ ..               ┃ ┗━>┃ poll (*)          ┃━━━>┃ poll (*)          ┃
67    /// ┗━━━━━━━━━━━━━━━━━━┛ :  ┃ ..                ┃    ┃ ..                ┃
68    ///                      :  ┗━━━━━━━━━━━━━━━━━━━┛    ┗━━━━━━━━━━━━━━━━━━━┛
69    /// ```
70    fn poll(&self) -> impl Future<Output = Result<(), E>>;
71
72    /// Close this [`Session`], cleaning up any callbacks (e.g. arguments
73    /// provided to [`Session::handle_request`]) and resources (e.g. views
74    /// returned by a call to [`Table::view`]).
75    ///
76    /// Dropping a [`Session`] outside of the context of [`Session::close`]
77    /// will cause a [`tracing`] error-level log to be emitted, but won't fail.
78    /// They will, however, leak.
79    fn close(self) -> impl Future<Output = ()>;
80}
81
82type ProxyCallback =
83    Arc<dyn Fn(&[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync>;
84
85/// A [`Session`] implementation which tunnels through another [`Client`].
86#[derive(Clone)]
87pub struct ProxySession {
88    parent: Client,
89    callback: ProxyCallback,
90}
91
92impl ProxySession {
93    pub fn new(
94        client: Client,
95        send_response: impl Fn(&[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
96            + Send
97            + Sync
98            + 'static,
99    ) -> Self {
100        ProxySession {
101            parent: client,
102            callback: Arc::new(send_response),
103        }
104    }
105}
106
107fn encode(response: Response, callback: ProxyCallback) -> Result<(), ClientError> {
108    let mut enc = vec![];
109    response.encode(&mut enc)?;
110    callback(&enc).map_err(|x| ClientError::Unknown(x.to_string()))?;
111    Ok(())
112}
113
114impl Session<ClientError> for ProxySession {
115    async fn handle_request(&self, request: &[u8]) -> Result<(), ClientError> {
116        let req = Request::decode(request)?;
117        let callback = self.callback.clone();
118        match req.client_req.as_ref() {
119            Some(ClientReq::ViewOnUpdateReq(_)) => {
120                let on_update = move |response| -> Pin<
121                    Box<dyn Future<Output = Result<(), ClientError>> + Send>,
122                > {
123                    let callback = callback.clone();
124                    Box::pin(async move { encode(response, callback) })
125                };
126
127                self.parent.subscribe(&req, Box::new(on_update)).await?
128            },
129            Some(_) => {
130                let on_update = move |response| encode(response, callback);
131                self.parent
132                    .subscribe_once(&req, Box::new(on_update))
133                    .await?
134            },
135            None => {
136                return Err(ClientError::Internal(
137                    "ProxySession::handle_request: invalid request".to_string(),
138                ))
139            },
140        };
141
142        Ok(())
143    }
144
145    async fn poll(&self) -> Result<(), ClientError> {
146        Ok(())
147    }
148
149    async fn close(self) {}
150}