1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
use std::pin::Pin;
use std::sync::Arc;
use futures::Future;
use prost::Message;
use crate::proto::request::ClientReq;
use crate::proto::{Request, Response};
use crate::{Client, ClientError};
#[cfg(doc)]
use crate::{Table, View};
/// The server-side representation of a connection to a
/// [`Client`]. For each [`Client`] that
/// wants to connect to a `perspective_server::Server`, a dedicated [`Session`]
/// must be created. The [`Session`] handles routing messages emitted by the
/// `perspective_server::Server`ve_server::Server`, as well as owning any
/// resources the [`Client`] may request.
pub trait Session<E> {
/// Handle an incoming request from the [`Client`]. Calling
/// [`Session::handle_request`] will result in the `send_response` parameter
/// which was used to construct this [`Session`] to fire one or more times.
///
/// ```text
/// :
/// Client : Session
/// ┏━━━━━━━━━━━━━━━━━━┓ : ┏━━━━━━━━━━━━━━━━━━━━┓
/// ┃ send_request ┃━━━>┃ handle_request (*) ┃
/// ┃ .. ┃ : ┃ .. ┃
/// ┗━━━━━━━━━━━━━━━━━━┛ : ┗━━━━━━━━━━━━━━━━━━━━┛
/// :
/// ```
///
/// # Arguments
///
/// - `request` An incoming request message, generated from a
/// [`Client::new`]'s `send_request` handler (which may-or-may-not be
/// local).
fn handle_request(&self, request: &[u8]) -> impl Future<Output = Result<(), E>>;
/// Flush any pending messages which may have resulted from previous
/// [`Session::handle_request`] calls. Calling [`Session::poll`] may result
/// in the `send_response` parameter which was used to construct this (or
/// other) [`Session`] to fire. Whenever a [`Session::handle_request`]
/// method is invoked for a `perspective_server::Server`, at least one
/// [`Session::poll`] should be scheduled to clear other clients message
/// queues.
///
/// ```text
/// :
/// Client : Session Server
/// ┏━━━━━━━━━━━━━━━━━━┓ : ┏━━━━━━━━━━━━━━━━━━━┓
/// ┃ send_request ┃━┳━>┃ handle_request ┃ ┏━━━━━━━━━━━━━━━━━━━┓
/// ┃ .. ┃ ┗━>┃ poll (*) ┃━━━>┃ poll (*) ┃
/// ┗━━━━━━━━━━━━━━━━━━┛ : ┃ .. ┃ ┃ .. ┃
/// : ┗━━━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━━━━━┛
/// ```
fn poll(&self) -> impl Future<Output = Result<(), E>>;
/// Close this [`Session`], cleaning up any callbacks (e.g. arguments
/// provided to [`Session::handle_request`]) and resources (e.g. views
/// returned by a call to [`Table::view`]).
///
/// Dropping a [`Session`] outside of the context of [`Session::close`]
/// will cause a [`tracing`] error-level log to be emitted, but won't fail.
/// They will, however, leak.
fn close(self) -> impl Future<Output = ()>;
}
type ProxyCallback =
Arc<dyn Fn(&[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync>;
/// A [`Session`] implementation which tunnels through another [`Client`].
#[derive(Clone)]
pub struct ProxySession {
parent: Client,
callback: ProxyCallback,
}
impl ProxySession {
pub async fn new(
client: Client,
send_response: impl Fn(&[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync
+ 'static,
) -> Self {
ProxySession {
parent: client,
callback: Arc::new(send_response),
}
}
}
fn encode(response: Response, callback: ProxyCallback) -> Result<(), ClientError> {
let mut enc = vec![];
response.encode(&mut enc)?;
callback(&enc)?;
Ok(())
}
impl Session<ClientError> for ProxySession {
async fn handle_request(&self, request: &[u8]) -> Result<(), ClientError> {
let req = Request::decode(request)?;
let callback = self.callback.clone();
match req.client_req.as_ref().unwrap() {
ClientReq::ViewOnUpdateReq(_) => {
let on_update = move |response| -> Pin<
Box<dyn Future<Output = Result<(), ClientError>> + Send>,
> {
let callback = callback.clone();
Box::pin(async move { encode(response, callback) })
};
self.parent.subscribe(&req, Box::new(on_update)).await?
},
_ => {
let on_update = move |response| encode(response, callback);
self.parent
.subscribe_once(&req, Box::new(on_update))
.await?
},
};
Ok(())
}
async fn poll(&self) -> Result<(), ClientError> {
Ok(())
}
async fn close(self) {}
}