1use crate::codec::*;
2use crate::endpoint::Endpoint;
3use crate::error::*;
4use crate::fair_queue::{FairQueue, QueueInner};
5use crate::transport::AcceptStopHandle;
6use crate::*;
7use crate::{SocketType, ZmqResult};
8
9use async_trait::async_trait;
10use dashmap::DashMap;
11use futures_util::{SinkExt, StreamExt};
12use parking_lot::Mutex;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17struct RepPeer {
18 pub(crate) _identity: PeerIdentity,
19 pub(crate) send_queue: ZmqFramedWrite,
20}
21
22struct RepSocketBackend {
23 pub(crate) peers: DashMap<PeerIdentity, RepPeer>,
24 fair_queue_inner: Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>,
25 socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
26 socket_options: SocketOptions,
27}
28
29pub struct RepSocket {
30 backend: Arc<RepSocketBackend>,
31 envelope: Option<ZmqMessage>,
32 current_request: Option<PeerIdentity>,
33 fair_queue: FairQueue<ZmqFramedRead, PeerIdentity>,
34 binds: HashMap<Endpoint, AcceptStopHandle>,
35}
36
37impl Drop for RepSocket {
38 fn drop(&mut self) {
39 self.backend.shutdown();
40 }
41}
42
43#[async_trait]
44impl Socket for RepSocket {
45 fn with_options(options: SocketOptions) -> Self {
46 let fair_queue = FairQueue::new(true);
47 Self {
48 backend: Arc::new(RepSocketBackend {
49 peers: DashMap::new(),
50 fair_queue_inner: fair_queue.inner(),
51 socket_monitor: Mutex::new(None),
52 socket_options: options,
53 }),
54 envelope: None,
55 current_request: None,
56 fair_queue,
57 binds: HashMap::new(),
58 }
59 }
60
61 fn backend(&self) -> Arc<dyn MultiPeerBackend> {
62 self.backend.clone()
63 }
64
65 fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
66 &mut self.binds
67 }
68
69 fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
70 let (sender, receiver) = mpsc::channel(1024);
71 self.backend.socket_monitor.lock().replace(sender);
72 receiver
73 }
74}
75
76#[async_trait]
77impl MultiPeerBackend for RepSocketBackend {
78 async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
79 let (recv_queue, send_queue) = io.into_parts();
80
81 self.peers.insert(
82 peer_id.clone(),
83 RepPeer {
84 _identity: peer_id.clone(),
85 send_queue,
86 },
87 );
88 self.fair_queue_inner
89 .lock()
90 .insert(peer_id.clone(), recv_queue);
91 }
92
93 fn peer_disconnected(&self, peer_id: &PeerIdentity) {
94 if let Some(monitor) = self.monitor().lock().as_mut() {
95 let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
96 }
97 self.peers.remove(peer_id);
98 }
99}
100
101impl SocketBackend for RepSocketBackend {
102 fn socket_type(&self) -> SocketType {
103 SocketType::REP
104 }
105
106 fn socket_options(&self) -> &SocketOptions {
107 &self.socket_options
108 }
109
110 fn shutdown(&self) {
111 self.peers.clear();
112 }
113
114 fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
115 &self.socket_monitor
116 }
117}
118
119#[async_trait]
120impl SocketSend for RepSocket {
121 async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
122 match self.current_request.take() {
123 Some(peer_id) => {
124 if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
125 if let Some(envelope) = self.envelope.take() {
126 message.prepend(&envelope);
127 }
128 peer.send_queue.send(Message::Message(message)).await?;
129 Ok(())
130 } else {
131 Err(ZmqError::ReturnToSender {
132 reason: "Client disconnected",
133 message,
134 })
135 }
136 }
137 None => Err(ZmqError::ReturnToSender {
138 reason: "Unable to send reply. No request in progress",
139 message,
140 }),
141 }
142 }
143}
144
145#[async_trait]
146impl SocketRecv for RepSocket {
147 async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
148 loop {
149 match self.fair_queue.next().await {
150 Some((peer_id, Ok(message))) => match message {
151 Message::Message(mut m) => {
152 assert!(m.len() > 1);
153 let mut at = 1;
154 for (index, frame) in m.iter().enumerate() {
155 if frame.is_empty() {
156 at = index + 1;
158 break;
159 }
160 }
161 let data = m.split_off(at);
162 self.envelope = Some(m);
163 self.current_request = Some(peer_id);
164 return Ok(data);
165 }
166 _ => todo!(),
167 },
168 Some((_peer_id, _)) => todo!(),
169 None => return Err(ZmqError::NoMessage),
170 };
171 }
172 }
173}