perspective_server/
local_client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use std::ops::Deref;
use std::sync::{Arc, OnceLock};

use async_lock::{RwLock, RwLockReadGuard};
use perspective_client::*;

use crate::local_session::LocalSession;
use crate::server::{Server, ServerError, SessionHandler};

#[derive(Clone, Default)]
struct LocalClientState {
    client: Arc<OnceLock<Client>>,
    session: Arc<OnceLock<RwLock<Option<LocalSession>>>>,
    server: Server,
}

impl SessionHandler for LocalClientState {
    async fn send_response<'a>(&'a mut self, msg: &'a [u8]) -> Result<(), ServerError> {
        self.get_client().handle_response(msg).await?;
        Ok(())
    }
}

impl ClientHandler for LocalClientState {
    async fn send_request<'a>(
        &'a self,
        msg: &'a [u8],
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let session_lock = self.get_session().await;
        let session = session_lock.as_ref().unwrap();
        session.handle_request(msg).await?;
        session.poll().await?;
        Ok(())
    }
}

impl LocalClientState {
    fn get_client(&self) -> &Client {
        self.client.get_or_init(|| Client::new(self.clone()))
    }

    async fn get_session(&self) -> RwLockReadGuard<'_, Option<LocalSession>> {
        if self.session.get().is_none() {
            let session = self.server.new_session(self.clone()).await;
            self.session
                .get_or_init(|| RwLock::new(Some(session)))
                .read()
                .await
        } else {
            self.session.get().unwrap().read().await
        }
    }
}

/// A [`Client`] specialized for connecting to an in-process [`Server`].
pub struct LocalClient(LocalClientState);

impl Deref for LocalClient {
    type Target = Client;

    fn deref(&self) -> &Self::Target {
        self.0.get_client()
    }
}

impl Drop for LocalClient {
    fn drop(&mut self) {
        if let Some(session) = self.0.session.get() {
            if session.try_read().unwrap().is_some() {
                tracing::error!("`Client` dropped without `Client::close`");
            }
        } else {
            tracing::debug!("`Session` dropped before init");
        }
    }
}

impl LocalClient {
    /// Create a new [`LocalClient`] instance for a [`Server`].
    pub fn new(server: &Server) -> Self {
        let state = LocalClientState {
            server: server.clone(),
            client: Arc::default(),
            session: Arc::default(),
        };

        LocalClient(state)
    }

    /// Close this [`LocalClient`]. Dropping a [`LocalClient`] instead of
    /// calling [`LocalClient::close`] will result in a log error, as this
    /// will leak!
    pub async fn close(self) {
        if let Some(session) = self.0.session.get() {
            session.write().await.take().unwrap().close().await
        } else {
            tracing::debug!("`Session` dropped before init");
        }
    }
}