perspective_server/
server.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::error::Error;
15use std::sync::Arc;
16
17use async_lock::RwLock;
18use futures::Future;
19use futures::future::BoxFuture;
20pub use perspective_client::Session;
21
22use crate::ffi;
23use crate::local_client::LocalClient;
24use crate::local_session::LocalSession;
25
26pub type ServerError = Box<dyn Error + Send + Sync>;
27
28pub type ServerResult<T> = Result<T, ServerError>;
29
30type SessionCallback =
31    Arc<dyn for<'a> Fn(&'a [u8]) -> BoxFuture<'a, Result<(), ServerError>> + Send + Sync>;
32
33type OnPollRequestCallback =
34    Arc<dyn Fn(&Server) -> BoxFuture<'static, Result<(), ServerError>> + Send + Sync>;
35
36/// Use [`SessionHandler`] to implement a callback for messages emitted from
37/// a [`Session`], to be passed to the [`Server::new_session`] constructor.
38///
39/// Alternatively, a [`Session`] can be created from a closure instead via
40/// [`Server::new_session_with_callback`].
41pub trait SessionHandler: Send + Sync {
42    /// Dispatch a message from a [`Server`] for a the [`Session`] that took
43    /// this `SessionHandler` instance as a constructor argument.
44    fn send_response<'a>(
45        &'a mut self,
46        msg: &'a [u8],
47    ) -> impl Future<Output = Result<(), ServerError>> + Send + 'a;
48}
49
50/// An instance of a Perspective server. Each [`Server`] instance is separate,
51/// and does not share [`perspective_client::Table`] (or other) data with other
52/// [`Server`]s.
53#[derive(Clone)]
54pub struct Server {
55    pub(crate) server: Arc<ffi::Server>,
56    pub(crate) callbacks: Arc<RwLock<HashMap<u32, SessionCallback>>>,
57
58    pub(crate) on_poll_request: Option<OnPollRequestCallback>,
59}
60
61impl std::fmt::Debug for Server {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        let addr = std::ptr::addr_of!(self);
64        write!(f, "Server {:?}", addr)?;
65        Ok(())
66    }
67}
68
69impl Server {
70    pub fn new(on_poll_request: Option<OnPollRequestCallback>) -> Self {
71        let server = Arc::new(ffi::Server::new(on_poll_request.is_some()));
72        let callbacks = Arc::default();
73        Self {
74            server,
75            callbacks,
76            on_poll_request,
77        }
78    }
79
80    /// An alternative method for creating a new [`Session`] for this
81    /// [`Server`], from a callback closure instead of a via a trait.
82    /// See [`Server::new_session`] for details.
83    ///
84    /// # Arguments
85    ///
86    /// - `send_response` -  A function invoked by the [`Server`] when a
87    ///   response message needs to be sent to the
88    ///   [`perspective_client::Client`].
89    pub async fn new_session_with_callback<F>(&self, send_response: F) -> LocalSession
90    where
91        F: for<'a> Fn(&'a [u8]) -> BoxFuture<'a, Result<(), ServerError>> + 'static + Sync + Send,
92    {
93        let id = self.server.new_session();
94        let server = self.clone();
95        self.callbacks
96            .write()
97            .await
98            .insert(id, Arc::new(send_response));
99
100        LocalSession {
101            id,
102            server,
103            closed: false,
104        }
105    }
106
107    /// Create a [`Session`] for this [`Server`], suitable for exactly one
108    /// [`perspective_client::Client`] (not necessarily in this process). A
109    /// [`Session`] represents the server-side state of a single
110    /// client-to-server connection.
111    ///
112    /// # Arguments
113    ///
114    /// - `session_handler` - An implementor of [`SessionHandler`] which will be
115    ///   invoked by the [`Server`] when a response message needs to be sent to
116    ///   the [`Client`]. The response itself should be passed to
117    ///   [`Client::handle_response`] eventually, though it may-or-may-not be in
118    ///   the same process.
119    pub async fn new_session<F>(&self, session_handler: F) -> LocalSession
120    where
121        F: SessionHandler + 'static + Sync + Send + Clone,
122    {
123        self.new_session_with_callback(move |msg| {
124            let mut session_handler = session_handler.clone();
125            Box::pin(async move { session_handler.send_response(msg).await })
126        })
127        .await
128    }
129
130    pub fn new_local_client(&self) -> LocalClient {
131        LocalClient::new(self)
132    }
133
134    /// Flush any pending messages which may have resulted from previous
135    /// [`Session::handle_request`] calls.
136    ///
137    /// Calling [`Session::poll`] may result in the `send_response` parameter
138    /// which was used to construct this (or other) [`Session`] to fire.
139    /// Whenever a [`Session::handle_request`] method is invoked for a
140    /// `perspective_server::Server`, at least one [`Session::poll`] should be
141    /// scheduled to clear other clients message queues.
142    ///
143    /// `poll()` _must_ be called after [`Table::update`] or [`Table::remove`]
144    /// and `on_poll_request` set, or these changes will not be applied.
145    pub async fn poll(&self) -> Result<(), ServerError> {
146        let responses = self.server.poll();
147        let mut results = Vec::with_capacity(responses.size());
148        for response in responses.iter_responses() {
149            let cb = self
150                .callbacks
151                .read()
152                .await
153                .get(&response.client_id())
154                .cloned();
155
156            if let Some(f) = cb {
157                results.push(f(response.msg()).await);
158            }
159        }
160
161        results.into_iter().collect()
162    }
163}