Skip to main content

gity_daemon/
events.rs

1use async_nng::AsyncSocket;
2use bincode::Options as BincodeOptions;
3use gity_ipc::{bounded_bincode, validate_message_size, DaemonNotification};
4use nng::{
5    options::{protocol::pubsub::Subscribe, Options},
6    Protocol,
7};
8use tokio::sync::mpsc;
9
10use crate::{map_client_error, DaemonError, ServerError, Shutdown};
11
12/// Publishes daemon notifications over a PUB socket.
13pub struct NotificationServer {
14    address: String,
15    receiver: mpsc::UnboundedReceiver<DaemonNotification>,
16}
17
18impl NotificationServer {
19    pub fn new(
20        address: impl Into<String>,
21        receiver: mpsc::UnboundedReceiver<DaemonNotification>,
22    ) -> Self {
23        Self {
24            address: address.into(),
25            receiver,
26        }
27    }
28
29    pub async fn run(mut self, shutdown: Shutdown) -> Result<(), ServerError> {
30        let socket = nng::Socket::new(Protocol::Pub0)?;
31        socket.listen(&self.address)?;
32        let mut async_socket = AsyncSocket::try_from(socket)?;
33
34        loop {
35            tokio::select! {
36                _ = shutdown.wait() => break,
37                message = self.receiver.recv() => match message {
38                    Some(notification) => {
39                        let payload = bounded_bincode()
40                            .serialize(&notification)
41                            .map_err(|err| ServerError::Serialization(err.to_string()))?;
42                        let mut msg = nng::Message::new();
43                        msg.push_back(&payload);
44                        async_socket
45                            .send(msg, None)
46                            .await
47                            .map_err(|(_, err)| ServerError::Socket(err))?;
48                    }
49                    None => break,
50                }
51            }
52        }
53
54        Ok(())
55    }
56}
57
58/// Subscribes to daemon notifications via SUB sockets.
59pub struct NotificationSubscriber {
60    address: String,
61}
62
63pub struct NotificationStream {
64    socket: AsyncSocket,
65}
66
67impl NotificationSubscriber {
68    pub fn new(address: impl Into<String>) -> Self {
69        Self {
70            address: address.into(),
71        }
72    }
73
74    pub async fn connect(&self) -> Result<NotificationStream, DaemonError> {
75        let socket = nng::Socket::new(Protocol::Sub0).map_err(map_client_error)?;
76        socket
77            .set_opt::<Subscribe>(Vec::new())
78            .map_err(map_client_error)?;
79        socket.dial(&self.address).map_err(map_client_error)?;
80        let async_socket = AsyncSocket::try_from(socket).map_err(map_client_error)?;
81        Ok(NotificationStream {
82            socket: async_socket,
83        })
84    }
85}
86
87impl NotificationStream {
88    pub async fn next(&mut self) -> Result<DaemonNotification, DaemonError> {
89        let message = self.socket.receive(None).await.map_err(map_client_error)?;
90        let data = message.as_slice();
91
92        validate_message_size(data).map_err(|err| DaemonError::Transport(err.to_string()))?;
93
94        let notification: DaemonNotification = bounded_bincode()
95            .deserialize(data)
96            .map_err(|err| DaemonError::Transport(err.to_string()))?;
97
98        Ok(notification)
99    }
100}