perspective_server/
local_client.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::ops::Deref;
14use std::sync::{Arc, OnceLock};
15
16use async_lock::{RwLock, RwLockReadGuard};
17use perspective_client::*;
18
19use crate::local_session::LocalSession;
20use crate::server::{Server, ServerError, SessionHandler};
21
22#[derive(Clone, Default)]
23struct LocalClientState {
24    client: Arc<OnceLock<Client>>,
25    session: Arc<OnceLock<RwLock<Option<LocalSession>>>>,
26    server: Server,
27}
28
29impl SessionHandler for LocalClientState {
30    async fn send_response<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), ServerError> {
31        self.get_client().handle_response(msg).await?;
32        Ok(())
33    }
34}
35
36impl ClientHandler for LocalClientState {
37    async fn send_request(
38        &self,
39        msg: Vec<u8>,
40    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
41        let session_lock = self.get_session().await;
42        let session = session_lock.as_ref().unwrap();
43        session.handle_request(&msg).await?;
44        session.poll().await?;
45        Ok(())
46    }
47}
48
49impl LocalClientState {
50    fn get_client(&self) -> &Client {
51        self.client.get_or_init(|| Client::new(self.clone()))
52    }
53
54    async fn get_session(&self) -> RwLockReadGuard<'_, Option<LocalSession>> {
55        if self.session.get().is_none() {
56            let session = self.server.new_session(self.clone()).await;
57            self.session
58                .get_or_init(|| RwLock::new(Some(session)))
59                .read()
60                .await
61        } else {
62            self.session.get().unwrap().read().await
63        }
64    }
65}
66
67/// A [`Client`] specialized for connecting to an in-process [`Server`].
68pub struct LocalClient(Option<LocalClientState>);
69
70impl Deref for LocalClient {
71    type Target = Client;
72
73    fn deref(&self) -> &Self::Target {
74        self.0.as_ref().unwrap().get_client()
75    }
76}
77
78impl Drop for LocalClient {
79    fn drop(&mut self) {
80        if let Some(state) = &self.0 {
81            if let Some(session) = state.session.get() {
82                if session.try_read().unwrap().is_some() {
83                    tracing::error!("`Client` dropped without `Client::close`");
84                }
85            } else {
86                tracing::debug!("`Session` dropped before init");
87            }
88        }
89    }
90}
91
92impl LocalClient {
93    /// Create a new [`LocalClient`] instance for a [`Server`].
94    pub fn new(server: &Server) -> Self {
95        let state = LocalClientState {
96            server: server.clone(),
97            client: Arc::default(),
98            session: Arc::default(),
99        };
100
101        LocalClient(Some(state))
102    }
103
104    pub fn take(mut self) -> Result<Client, &'static str> {
105        self.0.take().map(|x| x.get_client().clone()).ok_or("Empty")
106    }
107
108    /// Close this [`LocalClient`]. Dropping a [`LocalClient`] instead of
109    /// calling [`LocalClient::close`] will result in a log error, as this
110    /// will leak!
111    pub async fn close(self) {
112        if let Some(session) = self.0.as_ref().unwrap().session.get() {
113            session.write().await.take().unwrap().close().await
114        } else {
115            tracing::debug!("`Session` dropped before init");
116        }
117    }
118}