perspective_server/server.rs
1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors. ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::error::Error;
15use std::sync::Arc;
16
17use async_lock::RwLock;
18use futures::Future;
19use futures::future::BoxFuture;
20
21use crate::ffi;
22use crate::local_client::LocalClient;
23use crate::local_session::LocalSession;
24
25pub type ServerError = Box<dyn Error + Send + Sync>;
26
27pub type ServerResult<T> = Result<T, ServerError>;
28
29type SessionCallback =
30 Arc<dyn for<'a> Fn(&'a [u8]) -> BoxFuture<'a, Result<(), ServerError>> + Send + Sync>;
31
32type OnPollRequestCallback =
33 Arc<dyn Fn(&Server) -> BoxFuture<'static, Result<(), ServerError>> + Send + Sync>;
34
35/// Use [`SessionHandler`] to implement a callback for messages emitted from
36/// a [`Session`], to be passed to the [`Server::new_session`] constructor.
37///
38/// Alternatively, a [`Session`] can be created from a closure instead via
39/// [`Server::new_session_with_callback`].
40pub trait SessionHandler: Send + Sync {
41 /// Dispatch a message from a [`Server`] for a the [`Session`] that took
42 /// this `SessionHandler` instance as a constructor argument.
43 fn send_response<'a>(
44 &'a mut self,
45 msg: &'a [u8],
46 ) -> impl Future<Output = Result<(), ServerError>> + Send + 'a;
47}
48
49/// An instance of a Perspective server. Each [`Server`] instance is separate,
50/// and does not share [`perspective_client::Table`] (or other) data with other
51/// [`Server`]s.
52#[derive(Clone)]
53pub struct Server {
54 pub(crate) server: Arc<ffi::Server>,
55 pub(crate) callbacks: Arc<RwLock<HashMap<u32, SessionCallback>>>,
56 pub(crate) on_poll_request: Option<OnPollRequestCallback>,
57}
58
59impl std::fmt::Debug for Server {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 let addr = std::ptr::addr_of!(self);
62 write!(f, "Server {addr:?}")?;
63 Ok(())
64 }
65}
66
67impl Server {
68 /// Create a new [`Server`].
69 ///
70 /// # Arguments
71 ///
72 /// - `on_poll_request` A callback function which the `Server` will invoke
73 /// when there are updates that need to be flushed, after which you must
74 /// _eventually_ call [`Server::poll`] (or else no updates will be
75 /// processed). This optimization allows batching updates, depending on
76 /// context.
77 pub fn new(on_poll_request: Option<OnPollRequestCallback>) -> Self {
78 let server = Arc::new(ffi::Server::new(on_poll_request.is_some()));
79 let callbacks = Arc::default();
80 Self {
81 server,
82 callbacks,
83 on_poll_request,
84 }
85 }
86
87 /// An alternative method for creating a new [`Session`] for this
88 /// [`Server`], from a callback closure instead of a via a trait.
89 /// See [`Server::new_session`] for details.
90 ///
91 /// # Arguments
92 ///
93 /// - `send_response` - A function invoked by the [`Server`] when a
94 /// response message needs to be sent to the
95 /// [`perspective_client::Client`].
96 pub async fn new_session_with_callback<F>(&self, send_response: F) -> LocalSession
97 where
98 F: for<'a> Fn(&'a [u8]) -> BoxFuture<'a, Result<(), ServerError>> + 'static + Sync + Send,
99 {
100 let id = self.server.new_session();
101 let server = self.clone();
102 self.callbacks
103 .write()
104 .await
105 .insert(id, Arc::new(send_response));
106
107 LocalSession {
108 id,
109 server,
110 closed: false,
111 }
112 }
113
114 /// Create a [`Session`] for this [`Server`], suitable for exactly one
115 /// [`perspective_client::Client`] (not necessarily in this process). A
116 /// [`Session`] represents the server-side state of a single
117 /// client-to-server connection.
118 ///
119 /// # Arguments
120 ///
121 /// - `session_handler` - An implementor of [`SessionHandler`] which will be
122 /// invoked by the [`Server`] when a response message needs to be sent to
123 /// the [`Client`]. The response itself should be passed to
124 /// [`Client::handle_response`] eventually, though it may-or-may-not be in
125 /// the same process.
126 pub async fn new_session<F>(&self, session_handler: F) -> LocalSession
127 where
128 F: SessionHandler + 'static + Sync + Send + Clone,
129 {
130 self.new_session_with_callback(move |msg| {
131 let mut session_handler = session_handler.clone();
132 Box::pin(async move { session_handler.send_response(msg).await })
133 })
134 .await
135 }
136
137 /// Create a new [`Client`] instance bound to this [`Server`] directly.
138 pub fn new_local_client(&self) -> LocalClient {
139 LocalClient::new(self)
140 }
141
142 /// Flush any pending messages which may have resulted from previous
143 /// [`Session::handle_request`] calls.
144 ///
145 /// [`Server::poll`] only needs to be called if you've implemented
146 /// a custom Perspective [`Server`] and provided the `on_poll_request`
147 /// constructor keyword argument.
148 ///
149 /// Calling [`Session::poll`] may result in the `send_response` parameter
150 /// which was used to construct this (or other) [`Session`] to fire.
151 /// Whenever a [`Session::handle_request`] method is invoked for a
152 /// `perspective_server::Server`, at least one [`Session::poll`] should be
153 /// scheduled to clear other clients message queues.
154 ///
155 /// `poll()` _must_ be called after [`Table::update`] or [`Table::remove`]
156 /// and `on_poll_request` is notified, or the changes will not be applied.
157 pub async fn poll(&self) -> Result<(), ServerError> {
158 let responses = self.server.poll();
159 let mut results = Vec::with_capacity(responses.size());
160 for response in responses.iter_responses() {
161 let cb = self
162 .callbacks
163 .read()
164 .await
165 .get(&response.client_id())
166 .cloned();
167
168 if let Some(f) = cb {
169 results.push(f(response.msg()).await);
170 }
171 }
172
173 results.into_iter().collect()
174 }
175}