1use async_nng::AsyncSocket;
2use bincode::Options as BincodeOptions;
3use gity_ipc::{bounded_bincode, validate_message_size, DaemonNotification};
4use nng::{
5 options::{protocol::pubsub::Subscribe, Options},
6 Protocol,
7};
8use tokio::sync::mpsc;
9
10use crate::{map_client_error, DaemonError, ServerError, Shutdown};
11
12pub struct NotificationServer {
14 address: String,
15 receiver: mpsc::UnboundedReceiver<DaemonNotification>,
16}
17
18impl NotificationServer {
19 pub fn new(
20 address: impl Into<String>,
21 receiver: mpsc::UnboundedReceiver<DaemonNotification>,
22 ) -> Self {
23 Self {
24 address: address.into(),
25 receiver,
26 }
27 }
28
29 pub async fn run(mut self, shutdown: Shutdown) -> Result<(), ServerError> {
30 let socket = nng::Socket::new(Protocol::Pub0)?;
31 socket.listen(&self.address)?;
32 let mut async_socket = AsyncSocket::try_from(socket)?;
33
34 loop {
35 tokio::select! {
36 _ = shutdown.wait() => break,
37 message = self.receiver.recv() => match message {
38 Some(notification) => {
39 let payload = bounded_bincode()
40 .serialize(¬ification)
41 .map_err(|err| ServerError::Serialization(err.to_string()))?;
42 let mut msg = nng::Message::new();
43 msg.push_back(&payload);
44 async_socket
45 .send(msg, None)
46 .await
47 .map_err(|(_, err)| ServerError::Socket(err))?;
48 }
49 None => break,
50 }
51 }
52 }
53
54 Ok(())
55 }
56}
57
58pub struct NotificationSubscriber {
60 address: String,
61}
62
63pub struct NotificationStream {
64 socket: AsyncSocket,
65}
66
67impl NotificationSubscriber {
68 pub fn new(address: impl Into<String>) -> Self {
69 Self {
70 address: address.into(),
71 }
72 }
73
74 pub async fn connect(&self) -> Result<NotificationStream, DaemonError> {
75 let socket = nng::Socket::new(Protocol::Sub0).map_err(map_client_error)?;
76 socket
77 .set_opt::<Subscribe>(Vec::new())
78 .map_err(map_client_error)?;
79 socket.dial(&self.address).map_err(map_client_error)?;
80 let async_socket = AsyncSocket::try_from(socket).map_err(map_client_error)?;
81 Ok(NotificationStream {
82 socket: async_socket,
83 })
84 }
85}
86
87impl NotificationStream {
88 pub async fn next(&mut self) -> Result<DaemonNotification, DaemonError> {
89 let message = self.socket.receive(None).await.map_err(map_client_error)?;
90 let data = message.as_slice();
91
92 validate_message_size(data).map_err(|err| DaemonError::Transport(err.to_string()))?;
93
94 let notification: DaemonNotification = bounded_bincode()
95 .deserialize(data)
96 .map_err(|err| DaemonError::Transport(err.to_string()))?;
97
98 Ok(notification)
99 }
100}