zeromq/
rep.rs

1use crate::codec::*;
2use crate::endpoint::Endpoint;
3use crate::error::*;
4use crate::fair_queue::{FairQueue, QueueInner};
5use crate::transport::AcceptStopHandle;
6use crate::*;
7use crate::{SocketType, ZmqResult};
8
9use async_trait::async_trait;
10use dashmap::DashMap;
11use futures_util::{SinkExt, StreamExt};
12use parking_lot::Mutex;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17struct RepPeer {
18    pub(crate) _identity: PeerIdentity,
19    pub(crate) send_queue: ZmqFramedWrite,
20}
21
22struct RepSocketBackend {
23    pub(crate) peers: DashMap<PeerIdentity, RepPeer>,
24    fair_queue_inner: Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>,
25    socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
26    socket_options: SocketOptions,
27}
28
29pub struct RepSocket {
30    backend: Arc<RepSocketBackend>,
31    envelope: Option<ZmqMessage>,
32    current_request: Option<PeerIdentity>,
33    fair_queue: FairQueue<ZmqFramedRead, PeerIdentity>,
34    binds: HashMap<Endpoint, AcceptStopHandle>,
35}
36
37impl Drop for RepSocket {
38    fn drop(&mut self) {
39        self.backend.shutdown();
40    }
41}
42
43#[async_trait]
44impl Socket for RepSocket {
45    fn with_options(options: SocketOptions) -> Self {
46        let fair_queue = FairQueue::new(true);
47        Self {
48            backend: Arc::new(RepSocketBackend {
49                peers: DashMap::new(),
50                fair_queue_inner: fair_queue.inner(),
51                socket_monitor: Mutex::new(None),
52                socket_options: options,
53            }),
54            envelope: None,
55            current_request: None,
56            fair_queue,
57            binds: HashMap::new(),
58        }
59    }
60
61    fn backend(&self) -> Arc<dyn MultiPeerBackend> {
62        self.backend.clone()
63    }
64
65    fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
66        &mut self.binds
67    }
68
69    fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
70        let (sender, receiver) = mpsc::channel(1024);
71        self.backend.socket_monitor.lock().replace(sender);
72        receiver
73    }
74}
75
76#[async_trait]
77impl MultiPeerBackend for RepSocketBackend {
78    async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
79        let (recv_queue, send_queue) = io.into_parts();
80
81        self.peers.insert(
82            peer_id.clone(),
83            RepPeer {
84                _identity: peer_id.clone(),
85                send_queue,
86            },
87        );
88        self.fair_queue_inner
89            .lock()
90            .insert(peer_id.clone(), recv_queue);
91    }
92
93    fn peer_disconnected(&self, peer_id: &PeerIdentity) {
94        if let Some(monitor) = self.monitor().lock().as_mut() {
95            let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
96        }
97        self.peers.remove(peer_id);
98    }
99}
100
101impl SocketBackend for RepSocketBackend {
102    fn socket_type(&self) -> SocketType {
103        SocketType::REP
104    }
105
106    fn socket_options(&self) -> &SocketOptions {
107        &self.socket_options
108    }
109
110    fn shutdown(&self) {
111        self.peers.clear();
112    }
113
114    fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
115        &self.socket_monitor
116    }
117}
118
119#[async_trait]
120impl SocketSend for RepSocket {
121    async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
122        match self.current_request.take() {
123            Some(peer_id) => {
124                if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
125                    if let Some(envelope) = self.envelope.take() {
126                        message.prepend(&envelope);
127                    }
128                    peer.send_queue.send(Message::Message(message)).await?;
129                    Ok(())
130                } else {
131                    Err(ZmqError::ReturnToSender {
132                        reason: "Client disconnected",
133                        message,
134                    })
135                }
136            }
137            None => Err(ZmqError::ReturnToSender {
138                reason: "Unable to send reply. No request in progress",
139                message,
140            }),
141        }
142    }
143}
144
145#[async_trait]
146impl SocketRecv for RepSocket {
147    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
148        loop {
149            match self.fair_queue.next().await {
150                Some((peer_id, Ok(message))) => match message {
151                    Message::Message(mut m) => {
152                        assert!(m.len() > 1);
153                        let mut at = 1;
154                        for (index, frame) in m.iter().enumerate() {
155                            if frame.is_empty() {
156                                // Include delimiter in envelope.
157                                at = index + 1;
158                                break;
159                            }
160                        }
161                        let data = m.split_off(at);
162                        self.envelope = Some(m);
163                        self.current_request = Some(peer_id);
164                        return Ok(data);
165                    }
166                    _ => todo!(),
167                },
168                Some((_peer_id, _)) => todo!(),
169                None => return Err(ZmqError::NoMessage),
170            };
171        }
172    }
173}