1use std::collections::HashMap;
38use std::net::SocketAddr;
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use crate::core::types::MsgId;
42use crate::io::mbuf::{MbufPool, MbufQueue};
43use crate::io::reactor::{ConnRole, Transport};
44use crate::msg::{ConsistencyLevel, Msg, MsgQueue};
45
46use super::NetError;
47
48pub const MAX_CONN_QUEUE_SIZE: usize = 20_000;
56
57#[derive(Debug, Default, Clone)]
63pub struct ConnStats {
64 pub recv_bytes: u64,
66 pub send_bytes: u64,
68 pub recv_msgs: u64,
70 pub send_msgs: u64,
72 pub recv_events: u64,
74 pub send_events: u64,
76}
77
78#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
84pub struct ConnHandle(u64);
85
86impl ConnHandle {
87 #[must_use]
107 pub fn raw(self) -> u64 {
108 self.0
109 }
110}
111
112static NEXT_HANDLE: AtomicU64 = AtomicU64::new(1);
113
114fn next_handle() -> ConnHandle {
115 ConnHandle(NEXT_HANDLE.fetch_add(1, Ordering::Relaxed))
116}
117
118#[allow(clippy::struct_excessive_bools)]
120pub struct Conn {
121 handle: ConnHandle,
122 role: ConnRole,
123 transport: Option<Box<dyn Transport>>,
124 peer_addr: Option<SocketAddr>,
125 recv: MbufQueue,
126 send: MbufQueue,
127 imsg_q: MsgQueue,
128 omsg_q: MsgQueue,
129 rmsg: Option<Msg>,
130 smsg: Option<Msg>,
131 stats: ConnStats,
132 eof: bool,
133 done: bool,
134 err: Option<String>,
135 read_consistency: ConsistencyLevel,
136 write_consistency: ConsistencyLevel,
137 same_dc: bool,
138 dyn_mode: bool,
139 dnode_secured: bool,
140 crypto_key_sent: bool,
141 aes_key: Option<[u8; 32]>,
142 outstanding: HashMap<MsgId, MsgId>,
143 pool: MbufPool,
144}
145
146impl std::fmt::Debug for Conn {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 let _ = (
151 &self.transport,
152 &self.read_consistency,
153 &self.write_consistency,
154 &self.same_dc,
155 &self.dyn_mode,
156 &self.dnode_secured,
157 &self.crypto_key_sent,
158 &self.aes_key,
159 &self.outstanding,
160 &self.pool,
161 &self.stats,
162 &self.rmsg,
163 &self.smsg,
164 );
165 f.debug_struct("Conn")
166 .field("handle", &self.handle)
167 .field("role", &self.role)
168 .field("peer_addr", &self.peer_addr)
169 .field("recv_chain", &self.recv.len())
170 .field("send_chain", &self.send.len())
171 .field("imsg_q", &self.imsg_q.len())
172 .field("omsg_q", &self.omsg_q.len())
173 .field("recv_bytes", &self.stats.recv_bytes)
174 .field("send_bytes", &self.stats.send_bytes)
175 .field("eof", &self.eof)
176 .field("done", &self.done)
177 .field("err", &self.err)
178 .field("read_consistency", &self.read_consistency)
179 .field("write_consistency", &self.write_consistency)
180 .field("same_dc", &self.same_dc)
181 .field("dyn_mode", &self.dyn_mode)
182 .field("dnode_secured", &self.dnode_secured)
183 .field("crypto_key_sent", &self.crypto_key_sent)
184 .field("aes_key_set", &self.aes_key.is_some())
185 .field("outstanding", &self.outstanding.len())
186 .finish()
187 }
188}
189
190impl Conn {
191 pub fn new(transport: Box<dyn Transport>, role: ConnRole) -> Self {
211 let peer_addr = transport.peer_addr();
212 Self {
213 handle: next_handle(),
214 role,
215 transport: Some(transport),
216 peer_addr,
217 recv: MbufQueue::new(),
218 send: MbufQueue::new(),
219 imsg_q: MsgQueue::new(),
220 omsg_q: MsgQueue::new(),
221 rmsg: None,
222 smsg: None,
223 stats: ConnStats::default(),
224 eof: false,
225 done: false,
226 err: None,
227 read_consistency: ConsistencyLevel::DcOne,
228 write_consistency: ConsistencyLevel::DcOne,
229 same_dc: true,
230 dyn_mode: matches!(
231 role,
232 ConnRole::DnodePeerProxy | ConnRole::DnodePeerClient | ConnRole::DnodePeerServer
233 ),
234 dnode_secured: false,
235 crypto_key_sent: false,
236 aes_key: None,
237 outstanding: HashMap::new(),
238 pool: MbufPool::default(),
239 }
240 }
241
242 #[must_use]
244 pub fn handle(&self) -> ConnHandle {
245 self.handle
246 }
247
248 #[must_use]
250 pub fn role(&self) -> ConnRole {
251 self.role
252 }
253
254 #[must_use]
256 pub fn peer_addr(&self) -> Option<SocketAddr> {
257 self.peer_addr
258 }
259
260 #[must_use]
262 pub fn stats(&self) -> &ConnStats {
263 &self.stats
264 }
265
266 #[must_use]
268 pub fn recv_chain(&self) -> &MbufQueue {
269 &self.recv
270 }
271
272 pub fn recv_chain_mut(&mut self) -> &mut MbufQueue {
274 &mut self.recv
275 }
276
277 #[must_use]
279 pub fn send_chain(&self) -> &MbufQueue {
280 &self.send
281 }
282
283 pub fn send_chain_mut(&mut self) -> &mut MbufQueue {
285 &mut self.send
286 }
287
288 #[must_use]
290 pub fn imsg_q(&self) -> &MsgQueue {
291 &self.imsg_q
292 }
293
294 pub fn imsg_q_mut(&mut self) -> &mut MsgQueue {
296 &mut self.imsg_q
297 }
298
299 #[must_use]
301 pub fn omsg_q(&self) -> &MsgQueue {
302 &self.omsg_q
303 }
304
305 pub fn omsg_q_mut(&mut self) -> &mut MsgQueue {
307 &mut self.omsg_q
308 }
309
310 #[must_use]
312 pub fn rmsg(&self) -> Option<&Msg> {
313 self.rmsg.as_ref()
314 }
315
316 pub fn rmsg_mut(&mut self) -> Option<&mut Msg> {
318 self.rmsg.as_mut()
319 }
320
321 pub fn take_rmsg(&mut self) -> Option<Msg> {
324 self.rmsg.take()
325 }
326
327 pub fn set_rmsg(&mut self, msg: Option<Msg>) {
329 self.rmsg = msg;
330 }
331
332 #[must_use]
334 pub fn smsg(&self) -> Option<&Msg> {
335 self.smsg.as_ref()
336 }
337
338 pub fn take_smsg(&mut self) -> Option<Msg> {
340 self.smsg.take()
341 }
342
343 pub fn set_smsg(&mut self, msg: Option<Msg>) {
345 self.smsg = msg;
346 }
347
348 #[must_use]
350 pub fn is_eof(&self) -> bool {
351 self.eof
352 }
353
354 pub fn set_eof(&mut self) {
356 self.eof = true;
357 }
358
359 #[must_use]
362 pub fn is_done(&self) -> bool {
363 self.done
364 }
365
366 pub fn set_done(&mut self) {
368 self.done = true;
369 }
370
371 #[must_use]
373 pub fn err(&self) -> Option<&str> {
374 self.err.as_deref()
375 }
376
377 pub fn set_err<S: Into<String>>(&mut self, msg: S) {
379 self.err = Some(msg.into());
380 }
381
382 #[must_use]
385 pub fn read_consistency(&self) -> ConsistencyLevel {
386 self.read_consistency
387 }
388
389 #[must_use]
392 pub fn write_consistency(&self) -> ConsistencyLevel {
393 self.write_consistency
394 }
395
396 pub fn set_read_consistency(&mut self, c: ConsistencyLevel) {
398 self.read_consistency = c;
399 }
400
401 pub fn set_write_consistency(&mut self, c: ConsistencyLevel) {
403 self.write_consistency = c;
404 }
405
406 #[must_use]
408 pub fn same_dc(&self) -> bool {
409 self.same_dc
410 }
411
412 pub fn set_same_dc(&mut self, on: bool) {
415 self.same_dc = on;
416 }
417
418 #[must_use]
421 pub fn dyn_mode(&self) -> bool {
422 self.dyn_mode
423 }
424
425 #[must_use]
428 pub fn dnode_secured(&self) -> bool {
429 self.dnode_secured
430 }
431
432 pub fn set_dnode_secured(&mut self, on: bool) {
434 self.dnode_secured = on;
435 }
436
437 #[must_use]
439 pub fn crypto_key_sent(&self) -> bool {
440 self.crypto_key_sent
441 }
442
443 pub fn set_crypto_key_sent(&mut self, on: bool) {
445 self.crypto_key_sent = on;
446 }
447
448 #[must_use]
451 pub fn aes_key(&self) -> Option<&[u8; 32]> {
452 self.aes_key.as_ref()
453 }
454
455 pub fn set_aes_key(&mut self, key: [u8; 32]) {
457 self.aes_key = Some(key);
458 }
459
460 #[must_use]
462 pub fn outstanding(&self) -> &HashMap<MsgId, MsgId> {
463 &self.outstanding
464 }
465
466 pub fn outstanding_mut(&mut self) -> &mut HashMap<MsgId, MsgId> {
468 &mut self.outstanding
469 }
470
471 #[must_use]
473 pub fn mbuf_pool(&self) -> &MbufPool {
474 &self.pool
475 }
476
477 pub fn set_mbuf_pool(&mut self, pool: MbufPool) {
480 self.pool = pool;
481 }
482
483 pub fn take_transport(&mut self) -> Option<Box<dyn Transport>> {
488 self.transport.take()
489 }
490
491 pub fn set_transport(&mut self, transport: Box<dyn Transport>) {
494 self.peer_addr = transport.peer_addr();
495 self.transport = Some(transport);
496 }
497
498 #[must_use]
500 pub fn has_transport(&self) -> bool {
501 self.transport.is_some()
502 }
503
504 pub fn transport_mut(&mut self) -> Option<&mut Box<dyn Transport>> {
507 self.transport.as_mut()
508 }
509
510 pub fn enqueue_in(&mut self, msg: Msg) -> Result<(), NetError> {
516 if self.imsg_q.len() >= MAX_CONN_QUEUE_SIZE {
517 return Err(NetError::PoolExhausted);
518 }
519 self.imsg_q.push_back(msg);
520 self.stats.recv_msgs += 1;
521 Ok(())
522 }
523
524 pub fn enqueue_out(&mut self, msg: Msg) -> Result<(), NetError> {
530 if self.omsg_q.len() >= MAX_CONN_QUEUE_SIZE {
531 return Err(NetError::PoolExhausted);
532 }
533 self.omsg_q.push_back(msg);
534 self.stats.send_msgs += 1;
535 Ok(())
536 }
537
538 pub fn close(&mut self) {
540 self.transport = None;
541 self.done = true;
542 }
543
544 pub fn run(&mut self) -> Result<(), NetError> {
558 if self.transport.is_none() {
559 return Err(NetError::Closed);
560 }
561 if self.done {
562 return Ok(());
563 }
564 Ok(())
565 }
566
567 pub fn record_recv(&mut self, bytes: usize) {
569 self.stats.recv_bytes += bytes as u64;
570 self.stats.recv_events += 1;
571 }
572
573 pub fn record_send(&mut self, bytes: usize) {
575 self.stats.send_bytes += bytes as u64;
576 self.stats.send_events += 1;
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use crate::io::reactor::TcpTransport;
584 use crate::msg::MsgType;
585 use tokio::net::{TcpListener, TcpStream};
586
587 async fn pair() -> (Conn, Conn) {
588 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
589 let addr = listener.local_addr().unwrap();
590 let accept = tokio::spawn(async move {
591 let (s, _) = listener.accept().await.unwrap();
592 s
593 });
594 let client = TcpStream::connect(addr).await.unwrap();
595 let server = accept.await.unwrap();
596 let c = Conn::new(
597 Box::new(TcpTransport::new(client, ConnRole::Client)),
598 ConnRole::Client,
599 );
600 let s = Conn::new(
601 Box::new(TcpTransport::new(server, ConnRole::Server)),
602 ConnRole::Server,
603 );
604 (c, s)
605 }
606
607 #[tokio::test]
608 async fn enqueue_in_and_out() {
609 let (mut c, _s) = pair().await;
610 c.enqueue_in(Msg::new(1, MsgType::ReqRedisGet, true))
611 .unwrap();
612 c.enqueue_out(Msg::new(2, MsgType::RspRedisStatus, false))
613 .unwrap();
614 assert_eq!(c.imsg_q().len(), 1);
615 assert_eq!(c.omsg_q().len(), 1);
616 assert_eq!(c.stats().recv_msgs, 1);
617 assert_eq!(c.stats().send_msgs, 1);
618 }
619
620 #[tokio::test]
621 async fn close_drops_transport() {
622 let (mut c, _s) = pair().await;
623 assert!(c.has_transport());
624 c.close();
625 assert!(!c.has_transport());
626 assert!(c.is_done());
627 }
628
629 #[tokio::test]
630 async fn handle_is_unique() {
631 let (a, b) = pair().await;
632 assert_ne!(a.handle(), b.handle());
633 }
634
635 #[tokio::test]
636 async fn role_seed_drives_dyn_mode() {
637 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
638 let addr = listener.local_addr().unwrap();
639 let _accept = tokio::spawn(async move {
640 let (s, _) = listener.accept().await.unwrap();
641 drop(s);
642 });
643 let s = TcpStream::connect(addr).await.unwrap();
644 let c = Conn::new(
645 Box::new(TcpTransport::new(s, ConnRole::DnodePeerServer)),
646 ConnRole::DnodePeerServer,
647 );
648 assert!(c.dyn_mode());
649 assert!(c.same_dc());
650 }
651}