1use std::net::SocketAddr;
2use std::path::Path;
3use std::time::Duration;
4use thiserror::Error;
5use tracing::{debug, error};
6
7use tokio::net::{TcpListener, ToSocketAddrs};
8
9#[cfg(unix)]
10use tokio::net::{UnixListener, unix::SocketAddr as UnixSocketAddr};
11
12use crate::miniprotocols::handshake::n2n::VersionData;
13use crate::miniprotocols::handshake::{Confirmation, VersionNumber, VersionTable, n2c, n2n};
14
15use crate::miniprotocols::{
16 PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_MSG_NOTIFICATION,
17 PROTOCOL_N2C_MSG_SUBMISSION, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2C_TX_MONITOR,
18 PROTOCOL_N2C_TX_SUBMISSION, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC,
19 PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE, PROTOCOL_N2N_PEER_SHARING,
20 PROTOCOL_N2N_TX_SUBMISSION, blockfetch, chainsync, handshake, keepalive, localmsgnotification,
21 localmsgsubmission, localstate, localtxsubmission, peersharing, txmonitor, txsubmission,
22};
23
24use crate::multiplexer::{self, Bearer, RunningPlexer};
25
26#[derive(Debug, Error)]
28pub enum Error {
29 #[error("error in multiplexer")]
31 PlexerFailure(#[source] multiplexer::Error),
32
33 #[error("error connecting bearer")]
35 ConnectFailure(#[source] tokio::io::Error),
36
37 #[error("handshake protocol error")]
39 HandshakeProtocol(handshake::Error),
40
41 #[error("keepalive client loop error")]
43 KeepAliveClientLoop(keepalive::ClientError),
44
45 #[error("keepalive server loop error")]
47 KeepAliveServerLoop(keepalive::ServerError),
48
49 #[error("handshake version not accepted")]
51 IncompatibleVersion,
52}
53
54pub const DEFAULT_KEEP_ALIVE_INTERVAL_SEC: u64 = 20;
56
57pub type KeepAliveHandle = tokio::task::JoinHandle<Result<(), Error>>;
59
60pub enum KeepAliveLoop {
62 Client(keepalive::Client, Duration),
64 Server(keepalive::Server),
66}
67
68impl KeepAliveLoop {
69 pub fn client(client: keepalive::Client, interval: Duration) -> Self {
71 Self::Client(client, interval)
72 }
73
74 pub fn server(server: keepalive::Server) -> Self {
76 Self::Server(server)
77 }
78
79 pub async fn run_client(
81 mut client: keepalive::Client,
82 interval: Duration,
83 ) -> Result<(), Error> {
84 let mut interval = tokio::time::interval(interval);
85
86 loop {
87 interval.tick().await;
88 debug!("sending keepalive request");
89
90 client
91 .keepalive_roundtrip()
92 .await
93 .map_err(Error::KeepAliveClientLoop)?;
94 }
95 }
96
97 pub async fn run_server(mut server: keepalive::Server) -> Result<(), Error> {
99 loop {
100 debug!("waiting keepalive request");
101
102 server
103 .keepalive_roundtrip()
104 .await
105 .map_err(Error::KeepAliveServerLoop)?;
106 }
107 }
108
109 pub fn spawn(self) -> KeepAliveHandle {
111 match self {
112 KeepAliveLoop::Client(client, interval) => {
113 tokio::spawn(Self::run_client(client, interval))
114 }
115 KeepAliveLoop::Server(server) => tokio::spawn(Self::run_server(server)),
116 }
117 }
118}
119
120pub struct PeerClient {
123 pub plexer: RunningPlexer,
125 pub keepalive: KeepAliveHandle,
127 pub chainsync: chainsync::N2NClient,
129 pub blockfetch: blockfetch::Client,
131 pub txsubmission: txsubmission::Client,
133 pub peersharing: peersharing::Client,
135}
136
137impl PeerClient {
138 pub async fn connect(addr: impl ToSocketAddrs, magic: u64) -> Result<Self, Error> {
140 let bearer = Bearer::connect_tcp(addr)
141 .await
142 .map_err(Error::ConnectFailure)?;
143
144 let mut plexer = multiplexer::Plexer::new(bearer);
145
146 let channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
147 let mut handshake = handshake::Client::new(channel);
148
149 let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC);
150 let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH);
151 let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION);
152 let peersharing_channel = plexer.subscribe_client(PROTOCOL_N2N_PEER_SHARING);
153
154 let channel = plexer.subscribe_client(PROTOCOL_N2N_KEEP_ALIVE);
155 let keepalive = keepalive::Client::new(channel);
156
157 let plexer = plexer.spawn();
158
159 let versions = handshake::n2n::VersionTable::v7_and_above(magic);
160
161 let handshake = handshake
162 .handshake(versions)
163 .await
164 .map_err(Error::HandshakeProtocol)?;
165
166 if let handshake::Confirmation::Rejected(reason) = handshake {
167 error!(?reason, "handshake refused");
168 return Err(Error::IncompatibleVersion);
169 }
170
171 let keepalive = KeepAliveLoop::client(
172 keepalive,
173 Duration::from_secs(DEFAULT_KEEP_ALIVE_INTERVAL_SEC),
174 )
175 .spawn();
176
177 let client = Self {
178 plexer,
179 keepalive,
180 chainsync: chainsync::Client::new(cs_channel),
181 blockfetch: blockfetch::Client::new(bf_channel),
182 txsubmission: txsubmission::Client::new(txsub_channel),
183 peersharing: peersharing::Client::new(peersharing_channel),
184 };
185
186 Ok(client)
187 }
188
189 pub async fn handshake_query(
192 addr: impl ToSocketAddrs,
193 magic: u64,
194 ) -> Result<VersionTable<VersionData>, Error> {
195 let bearer = Bearer::connect_tcp(addr)
196 .await
197 .map_err(Error::ConnectFailure)?;
198
199 let mut plexer = multiplexer::Plexer::new(bearer);
200
201 let channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
202 let mut handshake = handshake::Client::new(channel);
203
204 let _plexer = plexer.spawn();
205
206 let versions = handshake::n2n::VersionTable::v7_and_above_with_query(magic, true);
207
208 let handshake = handshake
209 .handshake(versions)
210 .await
211 .map_err(Error::HandshakeProtocol)?;
212
213 let version_table = match handshake {
214 handshake::Confirmation::QueryReply(version_table) => {
215 debug!("handshake query reply received");
216 version_table
217 }
218 handshake::Confirmation::Accepted(_, _) => {
219 error!("handshake accepted when we expected query reply");
220 return Err(Error::HandshakeProtocol(handshake::Error::InvalidInbound));
221 }
222 handshake::Confirmation::Rejected(reason) => {
223 error!(?reason, "handshake refused");
224 return Err(Error::IncompatibleVersion);
225 }
226 };
227
228 Ok(version_table)
229 }
230
231 pub fn chainsync(&mut self) -> &mut chainsync::N2NClient {
233 &mut self.chainsync
234 }
235
236 pub async fn with_chainsync<T, O, Fut>(&mut self, op: T) -> tokio::task::JoinHandle<O>
238 where
239 T: FnOnce(&mut chainsync::N2NClient) -> Fut,
240 Fut: std::future::Future<Output = O> + Send + 'static,
241 O: Send + 'static,
242 {
243 tokio::spawn(op(&mut self.chainsync))
244 }
245
246 pub fn blockfetch(&mut self) -> &mut blockfetch::Client {
248 &mut self.blockfetch
249 }
250
251 pub fn txsubmission(&mut self) -> &mut txsubmission::Client {
253 &mut self.txsubmission
254 }
255
256 pub fn peersharing(&mut self) -> &mut peersharing::Client {
258 &mut self.peersharing
259 }
260
261 pub async fn abort(self) {
263 self.plexer.abort().await
264 }
265}
266
267pub struct PeerServer {
270 pub plexer: RunningPlexer,
272 pub handshake: handshake::N2NServer,
274 pub chainsync: chainsync::N2NServer,
276 pub blockfetch: blockfetch::Server,
278 pub txsubmission: txsubmission::Server,
280 pub keepalive: keepalive::Server,
282 pub peersharing: peersharing::Server,
284 accepted_address: Option<SocketAddr>,
285 accepted_version: Option<(u64, n2n::VersionData)>,
286}
287
288impl PeerServer {
289 pub fn new(bearer: Bearer) -> Self {
291 let mut plexer = multiplexer::Plexer::new(bearer);
292
293 let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
294 let cs_channel = plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC);
295 let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH);
296 let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION);
297 let keepalive_channel = plexer.subscribe_server(PROTOCOL_N2N_KEEP_ALIVE);
298 let peersharing_channel = plexer.subscribe_server(PROTOCOL_N2N_PEER_SHARING);
299
300 let hs = handshake::N2NServer::new(hs_channel);
301 let cs = chainsync::N2NServer::new(cs_channel);
302 let bf = blockfetch::Server::new(bf_channel);
303 let txsub = txsubmission::Server::new(txsub_channel);
304 let keepalive = keepalive::Server::new(keepalive_channel);
305 let peersharing = peersharing::Server::new(peersharing_channel);
306
307 let plexer = plexer.spawn();
308
309 Self {
310 plexer,
311 handshake: hs,
312 chainsync: cs,
313 blockfetch: bf,
314 txsubmission: txsub,
315 keepalive,
316 peersharing,
317 accepted_address: None,
318 accepted_version: None,
319 }
320 }
321
322 pub async fn accept(listener: &TcpListener, magic: u64) -> Result<Self, Error> {
324 let (bearer, address) = Bearer::accept_tcp(listener)
325 .await
326 .map_err(Error::ConnectFailure)?;
327
328 let mut client = Self::new(bearer);
329
330 let accepted_version = client
331 .handshake()
332 .handshake(n2n::VersionTable::v7_and_above(magic))
333 .await
334 .map_err(Error::HandshakeProtocol)?;
335
336 if let Some((version, data)) = accepted_version {
337 client.accepted_address = Some(address);
338 client.accepted_version = Some((version, data));
339 Ok(client)
340 } else {
341 client.abort().await;
342 Err(Error::IncompatibleVersion)
343 }
344 }
345
346 pub fn handshake(&mut self) -> &mut handshake::N2NServer {
348 &mut self.handshake
349 }
350
351 pub fn chainsync(&mut self) -> &mut chainsync::N2NServer {
353 &mut self.chainsync
354 }
355
356 pub fn blockfetch(&mut self) -> &mut blockfetch::Server {
358 &mut self.blockfetch
359 }
360
361 pub fn txsubmission(&mut self) -> &mut txsubmission::Server {
363 &mut self.txsubmission
364 }
365
366 pub fn keepalive(&mut self) -> &mut keepalive::Server {
368 &mut self.keepalive
369 }
370
371 pub fn peersharing(&mut self) -> &mut peersharing::Server {
373 &mut self.peersharing
374 }
375
376 pub fn accepted_address(&self) -> Option<&SocketAddr> {
378 self.accepted_address.as_ref()
379 }
380
381 pub fn accepted_version(&self) -> Option<&(u64, n2n::VersionData)> {
383 self.accepted_version.as_ref()
384 }
385
386 pub async fn abort(self) {
388 self.plexer.abort().await
389 }
390}
391
392pub struct NodeClient {
394 plexer: RunningPlexer,
395 handshake: handshake::N2CClient,
396 chainsync: chainsync::N2CClient,
397 statequery: localstate::Client,
398 submission: localtxsubmission::Client,
399 monitor: txmonitor::Client,
400}
401
402impl NodeClient {
403 pub fn new(bearer: Bearer) -> Self {
405 let mut plexer = multiplexer::Plexer::new(bearer);
406
407 let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
408 let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC);
409 let sq_channel = plexer.subscribe_client(PROTOCOL_N2C_STATE_QUERY);
410 let tx_channel = plexer.subscribe_client(PROTOCOL_N2C_TX_SUBMISSION);
411 let mo_channel = plexer.subscribe_client(PROTOCOL_N2C_TX_MONITOR);
412
413 let plexer = plexer.spawn();
414
415 Self {
416 plexer,
417 handshake: handshake::Client::new(hs_channel),
418 chainsync: chainsync::Client::new(cs_channel),
419 statequery: localstate::Client::new(sq_channel),
420 submission: localtxsubmission::Client::new(tx_channel),
421 monitor: txmonitor::Client::new(mo_channel),
422 }
423 }
424
425 #[cfg(unix)]
427 pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
428 let bearer = Bearer::connect_unix(path)
429 .await
430 .map_err(Error::ConnectFailure)?;
431
432 let mut client = Self::new(bearer);
433
434 let versions = handshake::n2c::VersionTable::v10_and_above(magic);
435
436 let handshake = client
437 .handshake()
438 .handshake(versions)
439 .await
440 .map_err(Error::HandshakeProtocol)?;
441
442 if let handshake::Confirmation::Rejected(reason) = handshake {
443 error!(?reason, "handshake refused");
444 return Err(Error::IncompatibleVersion);
445 }
446
447 Ok(client)
448 }
449
450 #[cfg(windows)]
452 pub async fn connect(
453 pipe_name: impl AsRef<std::ffi::OsStr>,
454 magic: u64,
455 ) -> Result<Self, Error> {
456 let pipe_name = pipe_name.as_ref().to_os_string();
457
458 let bearer = tokio::task::spawn_blocking(move || Bearer::connect_named_pipe(pipe_name))
459 .await
460 .expect("can't join tokio thread")
461 .map_err(Error::ConnectFailure)?;
462
463 let mut client = Self::new(bearer);
464
465 let versions = handshake::n2c::VersionTable::v10_and_above(magic);
466
467 let handshake = client
468 .handshake()
469 .handshake(versions)
470 .await
471 .map_err(Error::HandshakeProtocol)?;
472
473 if let handshake::Confirmation::Rejected(reason) = handshake {
474 error!(?reason, "handshake refused");
475 return Err(Error::IncompatibleVersion);
476 }
477
478 Ok(client)
479 }
480
481 #[cfg(unix)]
484 pub async fn handshake_query(
485 bearer: Bearer,
486 magic: u64,
487 ) -> Result<handshake::n2c::VersionTable, Error> {
488 let mut plexer = multiplexer::Plexer::new(bearer);
489
490 let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
491
492 let plexer = plexer.spawn();
493
494 let versions = handshake::n2c::VersionTable::v15_with_query(magic);
495 let mut client = handshake::Client::new(hs_channel);
496
497 let handshake = client
498 .handshake(versions)
499 .await
500 .map_err(Error::HandshakeProtocol)?;
501
502 match handshake {
503 Confirmation::Accepted(_, _) => {
504 error!("handshake accepted when we expected query reply");
505 Err(Error::HandshakeProtocol(handshake::Error::InvalidInbound))
506 }
507 Confirmation::Rejected(reason) => {
508 error!(?reason, "handshake refused");
509 Err(Error::IncompatibleVersion)
510 }
511 Confirmation::QueryReply(version_table) => {
512 plexer.abort().await;
513 Ok(version_table)
514 }
515 }
516 }
517
518 pub fn handshake(&mut self) -> &mut handshake::N2CClient {
520 &mut self.handshake
521 }
522
523 pub fn chainsync(&mut self) -> &mut chainsync::N2CClient {
525 &mut self.chainsync
526 }
527
528 pub fn statequery(&mut self) -> &mut localstate::Client {
530 &mut self.statequery
531 }
532
533 pub fn submission(&mut self) -> &mut localtxsubmission::Client {
535 &mut self.submission
536 }
537
538 pub fn monitor(&mut self) -> &mut txmonitor::Client {
540 &mut self.monitor
541 }
542
543 pub async fn abort(self) {
545 self.plexer.abort().await
546 }
547}
548
549#[cfg(unix)]
551pub struct NodeServer {
552 pub plexer: RunningPlexer,
554 pub handshake: handshake::N2CServer,
556 pub chainsync: chainsync::N2CServer,
558 pub statequery: localstate::Server,
560 pub localtxsubmission: localtxsubmission::Server,
562 accepted_address: Option<UnixSocketAddr>,
563 accpeted_version: Option<(VersionNumber, n2c::VersionData)>,
564}
565
566#[cfg(unix)]
567impl NodeServer {
568 pub async fn new(bearer: Bearer) -> Self {
570 let mut plexer = multiplexer::Plexer::new(bearer);
571
572 let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
573 let cs_channel = plexer.subscribe_server(PROTOCOL_N2C_CHAIN_SYNC);
574 let sq_channel = plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY);
575 let localtx_channel = plexer.subscribe_server(PROTOCOL_N2C_TX_SUBMISSION);
576
577 let server_hs = handshake::Server::<n2c::VersionData>::new(hs_channel);
578 let server_cs = chainsync::N2CServer::new(cs_channel);
579 let server_sq = localstate::Server::new(sq_channel);
580 let server_localtx = localtxsubmission::Server::new(localtx_channel);
581
582 let plexer = plexer.spawn();
583
584 Self {
585 plexer,
586 handshake: server_hs,
587 chainsync: server_cs,
588 statequery: server_sq,
589 localtxsubmission: server_localtx,
590 accepted_address: None,
591 accpeted_version: None,
592 }
593 }
594
595 pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
597 let (bearer, address) = Bearer::accept_unix(listener)
598 .await
599 .map_err(Error::ConnectFailure)?;
600
601 let mut client = Self::new(bearer).await;
602
603 let accepted_version = client
604 .handshake()
605 .handshake(n2c::VersionTable::v10_and_above(magic))
606 .await
607 .map_err(Error::HandshakeProtocol)?;
608
609 if let Some(version) = accepted_version {
610 client.accepted_address = Some(address);
611 client.accpeted_version = Some(version);
612 Ok(client)
613 } else {
614 client.abort().await;
615 Err(Error::IncompatibleVersion)
616 }
617 }
618
619 pub fn handshake(&mut self) -> &mut handshake::N2CServer {
621 &mut self.handshake
622 }
623
624 pub fn chainsync(&mut self) -> &mut chainsync::N2CServer {
626 &mut self.chainsync
627 }
628
629 pub fn statequery(&mut self) -> &mut localstate::Server {
631 &mut self.statequery
632 }
633
634 pub fn localtxsubmission(&mut self) -> &mut localtxsubmission::Server {
636 &mut self.localtxsubmission
637 }
638
639 pub fn accepted_address(&self) -> Option<&UnixSocketAddr> {
641 self.accepted_address.as_ref()
642 }
643
644 pub fn accepted_version(&self) -> Option<&(u64, n2c::VersionData)> {
646 self.accpeted_version.as_ref()
647 }
648
649 pub async fn abort(self) {
651 self.plexer.abort().await
652 }
653}
654
655pub struct DmqClient {
659 plexer: RunningPlexer,
660 handshake: handshake::N2CClient,
661 msg_submission: localmsgsubmission::Client,
662 msg_notification: localmsgnotification::Client,
663}
664
665impl DmqClient {
666 pub fn new(bearer: Bearer) -> Self {
668 let mut plexer = multiplexer::Plexer::new(bearer);
669
670 let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
671 let msg_submission_channel = plexer.subscribe_client(PROTOCOL_N2C_MSG_SUBMISSION);
672 let msg_notification_channel = plexer.subscribe_client(PROTOCOL_N2C_MSG_NOTIFICATION);
673
674 let plexer = plexer.spawn();
675
676 Self {
677 plexer,
678 handshake: handshake::Client::new(hs_channel),
679 msg_submission: localmsgsubmission::Client::new(msg_submission_channel),
680 msg_notification: localmsgnotification::Client::new(msg_notification_channel),
681 }
682 }
683
684 #[cfg(unix)]
686 pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
687 let bearer = Bearer::connect_unix(path)
688 .await
689 .map_err(Error::ConnectFailure)?;
690
691 let mut client = Self::new(bearer);
692
693 let versions = handshake::n2c::VersionTable::dmq(magic);
694
695 let handshake = client
696 .handshake()
697 .handshake(versions)
698 .await
699 .map_err(Error::HandshakeProtocol)?;
700
701 if let handshake::Confirmation::Rejected(reason) = handshake {
702 error!(?reason, "handshake refused");
703 return Err(Error::IncompatibleVersion);
704 }
705
706 Ok(client)
707 }
708
709 #[cfg(windows)]
711 pub async fn connect(
712 pipe_name: impl AsRef<std::ffi::OsStr>,
713 magic: u64,
714 ) -> Result<Self, Error> {
715 let pipe_name = pipe_name.as_ref().to_os_string();
716
717 let bearer = tokio::task::spawn_blocking(move || Bearer::connect_named_pipe(pipe_name))
718 .await
719 .expect("can't join tokio thread")
720 .map_err(Error::ConnectFailure)?;
721
722 let mut client = Self::new(bearer);
723
724 let versions = handshake::n2c::VersionTable::v10_and_above(magic);
725
726 let handshake = client
727 .handshake()
728 .handshake(versions)
729 .await
730 .map_err(Error::HandshakeProtocol)?;
731
732 if let handshake::Confirmation::Rejected(reason) = handshake {
733 error!(?reason, "handshake refused");
734 return Err(Error::IncompatibleVersion);
735 }
736
737 Ok(client)
738 }
739
740 #[cfg(unix)]
742 pub async fn handshake_query(
743 bearer: Bearer,
744 magic: u64,
745 ) -> Result<handshake::n2c::VersionTable, Error> {
746 let mut plexer = multiplexer::Plexer::new(bearer);
747
748 let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
749
750 let plexer = plexer.spawn();
751
752 let versions = handshake::n2c::VersionTable::dmq(magic);
753 let mut client = handshake::Client::new(hs_channel);
754
755 let handshake = client
756 .handshake(versions)
757 .await
758 .map_err(Error::HandshakeProtocol)?;
759
760 match handshake {
761 Confirmation::Accepted(_, _) => {
762 error!("handshake accepted when we expected query reply");
763 Err(Error::HandshakeProtocol(handshake::Error::InvalidInbound))
764 }
765 Confirmation::Rejected(reason) => {
766 error!(?reason, "handshake refused");
767 Err(Error::IncompatibleVersion)
768 }
769 Confirmation::QueryReply(version_table) => {
770 plexer.abort().await;
771 Ok(version_table)
772 }
773 }
774 }
775
776 pub fn handshake(&mut self) -> &mut handshake::N2CClient {
778 &mut self.handshake
779 }
780
781 pub fn msg_submission(&mut self) -> &mut localmsgsubmission::Client {
783 &mut self.msg_submission
784 }
785
786 pub fn msg_notification(&mut self) -> &mut localmsgnotification::Client {
788 &mut self.msg_notification
789 }
790
791 pub async fn abort(self) {
793 self.plexer.abort().await
794 }
795}
796
797#[cfg(unix)]
801pub struct DmqServer {
802 pub plexer: RunningPlexer,
804 pub handshake: handshake::N2CServer,
806 pub msg_notification: localmsgnotification::Server,
808 pub msg_submission: localmsgsubmission::Server,
810 accepted_address: Option<UnixSocketAddr>,
811 accpeted_version: Option<(VersionNumber, n2c::VersionData)>,
812}
813
814#[cfg(unix)]
815impl DmqServer {
816 pub async fn new(bearer: Bearer) -> Self {
818 let mut plexer = multiplexer::Plexer::new(bearer);
819
820 let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
821 let msg_notification_channel = plexer.subscribe_server(PROTOCOL_N2C_MSG_NOTIFICATION);
822 let msg_submission_channel = plexer.subscribe_server(PROTOCOL_N2C_MSG_SUBMISSION);
823
824 let server_hs = handshake::Server::<n2c::VersionData>::new(hs_channel);
825 let server_msg_notification = localmsgnotification::Server::new(msg_notification_channel);
826 let server_msg_submission = localmsgsubmission::Server::new(msg_submission_channel);
827
828 let plexer = plexer.spawn();
829
830 Self {
831 plexer,
832 handshake: server_hs,
833 msg_notification: server_msg_notification,
834 msg_submission: server_msg_submission,
835 accepted_address: None,
836 accpeted_version: None,
837 }
838 }
839
840 pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
842 let (bearer, address) = Bearer::accept_unix(listener)
843 .await
844 .map_err(Error::ConnectFailure)?;
845
846 let mut client = Self::new(bearer).await;
847
848 let accepted_version = client
849 .handshake()
850 .handshake(n2c::VersionTable::dmq(magic))
851 .await
852 .map_err(Error::HandshakeProtocol)?;
853
854 if let Some(version) = accepted_version {
855 client.accepted_address = Some(address);
856 client.accpeted_version = Some(version);
857 Ok(client)
858 } else {
859 client.abort().await;
860 Err(Error::IncompatibleVersion)
861 }
862 }
863
864 pub fn handshake(&mut self) -> &mut handshake::N2CServer {
866 &mut self.handshake
867 }
868
869 pub fn msg_notification(&mut self) -> &mut localmsgnotification::Server {
871 &mut self.msg_notification
872 }
873
874 pub fn msg_submission(&mut self) -> &mut localmsgsubmission::Server {
876 &mut self.msg_submission
877 }
878
879 pub fn accepted_address(&self) -> Option<&UnixSocketAddr> {
881 self.accepted_address.as_ref()
882 }
883
884 pub fn accepted_version(&self) -> Option<&(u64, n2c::VersionData)> {
886 self.accpeted_version.as_ref()
887 }
888
889 pub async fn abort(self) {
891 self.plexer.abort().await
892 }
893}