Skip to main content

pallas_network/
facades.rs

1use std::net::SocketAddr;
2use std::path::Path;
3use std::time::Duration;
4use thiserror::Error;
5use tracing::{debug, error};
6
7use tokio::net::{TcpListener, ToSocketAddrs};
8
9#[cfg(unix)]
10use tokio::net::{UnixListener, unix::SocketAddr as UnixSocketAddr};
11
12use crate::miniprotocols::handshake::n2n::VersionData;
13use crate::miniprotocols::handshake::{Confirmation, VersionNumber, VersionTable, n2c, n2n};
14
15use crate::miniprotocols::{
16    PROTOCOL_N2C_CHAIN_SYNC, PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_MSG_NOTIFICATION,
17    PROTOCOL_N2C_MSG_SUBMISSION, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2C_TX_MONITOR,
18    PROTOCOL_N2C_TX_SUBMISSION, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC,
19    PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE, PROTOCOL_N2N_PEER_SHARING,
20    PROTOCOL_N2N_TX_SUBMISSION, blockfetch, chainsync, handshake, keepalive, localmsgnotification,
21    localmsgsubmission, localstate, localtxsubmission, peersharing, txmonitor, txsubmission,
22};
23
24use crate::multiplexer::{self, Bearer, RunningPlexer};
25
26/// Errors produced by the high-level peer/node facades.
27#[derive(Debug, Error)]
28pub enum Error {
29    /// Underlying multiplexer failure.
30    #[error("error in multiplexer")]
31    PlexerFailure(#[source] multiplexer::Error),
32
33    /// Failed to open or accept the bearer connection.
34    #[error("error connecting bearer")]
35    ConnectFailure(#[source] tokio::io::Error),
36
37    /// Handshake mini-protocol error.
38    #[error("handshake protocol error")]
39    HandshakeProtocol(handshake::Error),
40
41    /// Keep-alive loop reported a client-side error.
42    #[error("keepalive client loop error")]
43    KeepAliveClientLoop(keepalive::ClientError),
44
45    /// Keep-alive loop reported a server-side error.
46    #[error("keepalive server loop error")]
47    KeepAliveServerLoop(keepalive::ServerError),
48
49    /// The remote rejected every version offered in the handshake.
50    #[error("handshake version not accepted")]
51    IncompatibleVersion,
52}
53
54/// Default interval between keep-alive pings.
55pub const DEFAULT_KEEP_ALIVE_INTERVAL_SEC: u64 = 20;
56
57/// Handle to a spawned keep-alive loop.
58pub type KeepAliveHandle = tokio::task::JoinHandle<Result<(), Error>>;
59
60/// A keep-alive loop on either side of the connection.
61pub enum KeepAliveLoop {
62    /// Client-side loop: ping the peer at the given interval.
63    Client(keepalive::Client, Duration),
64    /// Server-side loop: respond to pings from the peer.
65    Server(keepalive::Server),
66}
67
68impl KeepAliveLoop {
69    /// Build a client-side loop that pings every `interval`.
70    pub fn client(client: keepalive::Client, interval: Duration) -> Self {
71        Self::Client(client, interval)
72    }
73
74    /// Build a server-side loop that responds to incoming pings.
75    pub fn server(server: keepalive::Server) -> Self {
76        Self::Server(server)
77    }
78
79    /// Drive a client-side keep-alive loop until it errors.
80    pub async fn run_client(
81        mut client: keepalive::Client,
82        interval: Duration,
83    ) -> Result<(), Error> {
84        let mut interval = tokio::time::interval(interval);
85
86        loop {
87            interval.tick().await;
88            debug!("sending keepalive request");
89
90            client
91                .keepalive_roundtrip()
92                .await
93                .map_err(Error::KeepAliveClientLoop)?;
94        }
95    }
96
97    /// Drive a server-side keep-alive loop until it errors.
98    pub async fn run_server(mut server: keepalive::Server) -> Result<(), Error> {
99        loop {
100            debug!("waiting keepalive request");
101
102            server
103                .keepalive_roundtrip()
104                .await
105                .map_err(Error::KeepAliveServerLoop)?;
106        }
107    }
108
109    /// Spawn the loop on the current Tokio runtime.
110    pub fn spawn(self) -> KeepAliveHandle {
111        match self {
112            KeepAliveLoop::Client(client, interval) => {
113                tokio::spawn(Self::run_client(client, interval))
114            }
115            KeepAliveLoop::Server(server) => tokio::spawn(Self::run_server(server)),
116        }
117    }
118}
119
120/// Node-to-node Ouroboros client. Bundles the chain-sync, block-fetch,
121/// tx-submission, peer-sharing, and keep-alive protocols over a single bearer.
122pub struct PeerClient {
123    /// Underlying running multiplexer.
124    pub plexer: RunningPlexer,
125    /// Handle to the spawned keep-alive task.
126    pub keepalive: KeepAliveHandle,
127    /// Chain-sync client (node-to-node).
128    pub chainsync: chainsync::N2NClient,
129    /// Block-fetch client.
130    pub blockfetch: blockfetch::Client,
131    /// Tx-submission client.
132    pub txsubmission: txsubmission::Client,
133    /// Peer-sharing client.
134    pub peersharing: peersharing::Client,
135}
136
137impl PeerClient {
138    /// Connect to `addr` and perform the N2N handshake using the given network magic.
139    pub async fn connect(addr: impl ToSocketAddrs, magic: u64) -> Result<Self, Error> {
140        let bearer = Bearer::connect_tcp(addr)
141            .await
142            .map_err(Error::ConnectFailure)?;
143
144        let mut plexer = multiplexer::Plexer::new(bearer);
145
146        let channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
147        let mut handshake = handshake::Client::new(channel);
148
149        let cs_channel = plexer.subscribe_client(PROTOCOL_N2N_CHAIN_SYNC);
150        let bf_channel = plexer.subscribe_client(PROTOCOL_N2N_BLOCK_FETCH);
151        let txsub_channel = plexer.subscribe_client(PROTOCOL_N2N_TX_SUBMISSION);
152        let peersharing_channel = plexer.subscribe_client(PROTOCOL_N2N_PEER_SHARING);
153
154        let channel = plexer.subscribe_client(PROTOCOL_N2N_KEEP_ALIVE);
155        let keepalive = keepalive::Client::new(channel);
156
157        let plexer = plexer.spawn();
158
159        let versions = handshake::n2n::VersionTable::v7_and_above(magic);
160
161        let handshake = handshake
162            .handshake(versions)
163            .await
164            .map_err(Error::HandshakeProtocol)?;
165
166        if let handshake::Confirmation::Rejected(reason) = handshake {
167            error!(?reason, "handshake refused");
168            return Err(Error::IncompatibleVersion);
169        }
170
171        let keepalive = KeepAliveLoop::client(
172            keepalive,
173            Duration::from_secs(DEFAULT_KEEP_ALIVE_INTERVAL_SEC),
174        )
175        .spawn();
176
177        let client = Self {
178            plexer,
179            keepalive,
180            chainsync: chainsync::Client::new(cs_channel),
181            blockfetch: blockfetch::Client::new(bf_channel),
182            txsubmission: txsubmission::Client::new(txsub_channel),
183            peersharing: peersharing::Client::new(peersharing_channel),
184        };
185
186        Ok(client)
187    }
188
189    /// Connect, issue a query-mode handshake, and return the peer's advertised
190    /// version table without keeping the connection alive.
191    pub async fn handshake_query(
192        addr: impl ToSocketAddrs,
193        magic: u64,
194    ) -> Result<VersionTable<VersionData>, Error> {
195        let bearer = Bearer::connect_tcp(addr)
196            .await
197            .map_err(Error::ConnectFailure)?;
198
199        let mut plexer = multiplexer::Plexer::new(bearer);
200
201        let channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
202        let mut handshake = handshake::Client::new(channel);
203
204        let _plexer = plexer.spawn();
205
206        let versions = handshake::n2n::VersionTable::v7_and_above_with_query(magic, true);
207
208        let handshake = handshake
209            .handshake(versions)
210            .await
211            .map_err(Error::HandshakeProtocol)?;
212
213        let version_table = match handshake {
214            handshake::Confirmation::QueryReply(version_table) => {
215                debug!("handshake query reply received");
216                version_table
217            }
218            handshake::Confirmation::Accepted(_, _) => {
219                error!("handshake accepted when we expected query reply");
220                return Err(Error::HandshakeProtocol(handshake::Error::InvalidInbound));
221            }
222            handshake::Confirmation::Rejected(reason) => {
223                error!(?reason, "handshake refused");
224                return Err(Error::IncompatibleVersion);
225            }
226        };
227
228        Ok(version_table)
229    }
230
231    /// Get mutable access to the chain-sync client.
232    pub fn chainsync(&mut self) -> &mut chainsync::N2NClient {
233        &mut self.chainsync
234    }
235
236    /// Run an operation against the chain-sync client on a background task.
237    pub async fn with_chainsync<T, O, Fut>(&mut self, op: T) -> tokio::task::JoinHandle<O>
238    where
239        T: FnOnce(&mut chainsync::N2NClient) -> Fut,
240        Fut: std::future::Future<Output = O> + Send + 'static,
241        O: Send + 'static,
242    {
243        tokio::spawn(op(&mut self.chainsync))
244    }
245
246    /// Get mutable access to the block-fetch client.
247    pub fn blockfetch(&mut self) -> &mut blockfetch::Client {
248        &mut self.blockfetch
249    }
250
251    /// Get mutable access to the tx-submission client.
252    pub fn txsubmission(&mut self) -> &mut txsubmission::Client {
253        &mut self.txsubmission
254    }
255
256    /// Get mutable access to the peer-sharing client.
257    pub fn peersharing(&mut self) -> &mut peersharing::Client {
258        &mut self.peersharing
259    }
260
261    /// Tear down the underlying multiplexer and abort all spawned tasks.
262    pub async fn abort(self) {
263        self.plexer.abort().await
264    }
265}
266
267/// Node-to-node Ouroboros server. Accepts a peer connection and exposes the
268/// server side of each mini-protocol carried over the bearer.
269pub struct PeerServer {
270    /// Underlying running multiplexer.
271    pub plexer: RunningPlexer,
272    /// Handshake server.
273    pub handshake: handshake::N2NServer,
274    /// Chain-sync server.
275    pub chainsync: chainsync::N2NServer,
276    /// Block-fetch server.
277    pub blockfetch: blockfetch::Server,
278    /// Tx-submission server.
279    pub txsubmission: txsubmission::Server,
280    /// Keep-alive server.
281    pub keepalive: keepalive::Server,
282    /// Peer-sharing server.
283    pub peersharing: peersharing::Server,
284    accepted_address: Option<SocketAddr>,
285    accepted_version: Option<(u64, n2n::VersionData)>,
286}
287
288impl PeerServer {
289    /// Build a server over an already-accepted bearer.
290    pub fn new(bearer: Bearer) -> Self {
291        let mut plexer = multiplexer::Plexer::new(bearer);
292
293        let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
294        let cs_channel = plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC);
295        let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH);
296        let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION);
297        let keepalive_channel = plexer.subscribe_server(PROTOCOL_N2N_KEEP_ALIVE);
298        let peersharing_channel = plexer.subscribe_server(PROTOCOL_N2N_PEER_SHARING);
299
300        let hs = handshake::N2NServer::new(hs_channel);
301        let cs = chainsync::N2NServer::new(cs_channel);
302        let bf = blockfetch::Server::new(bf_channel);
303        let txsub = txsubmission::Server::new(txsub_channel);
304        let keepalive = keepalive::Server::new(keepalive_channel);
305        let peersharing = peersharing::Server::new(peersharing_channel);
306
307        let plexer = plexer.spawn();
308
309        Self {
310            plexer,
311            handshake: hs,
312            chainsync: cs,
313            blockfetch: bf,
314            txsubmission: txsub,
315            keepalive,
316            peersharing,
317            accepted_address: None,
318            accepted_version: None,
319        }
320    }
321
322    /// Accept the next connection from `listener` and complete the N2N handshake.
323    pub async fn accept(listener: &TcpListener, magic: u64) -> Result<Self, Error> {
324        let (bearer, address) = Bearer::accept_tcp(listener)
325            .await
326            .map_err(Error::ConnectFailure)?;
327
328        let mut client = Self::new(bearer);
329
330        let accepted_version = client
331            .handshake()
332            .handshake(n2n::VersionTable::v7_and_above(magic))
333            .await
334            .map_err(Error::HandshakeProtocol)?;
335
336        if let Some((version, data)) = accepted_version {
337            client.accepted_address = Some(address);
338            client.accepted_version = Some((version, data));
339            Ok(client)
340        } else {
341            client.abort().await;
342            Err(Error::IncompatibleVersion)
343        }
344    }
345
346    /// Get mutable access to the handshake server.
347    pub fn handshake(&mut self) -> &mut handshake::N2NServer {
348        &mut self.handshake
349    }
350
351    /// Get mutable access to the chain-sync server.
352    pub fn chainsync(&mut self) -> &mut chainsync::N2NServer {
353        &mut self.chainsync
354    }
355
356    /// Get mutable access to the block-fetch server.
357    pub fn blockfetch(&mut self) -> &mut blockfetch::Server {
358        &mut self.blockfetch
359    }
360
361    /// Get mutable access to the tx-submission server.
362    pub fn txsubmission(&mut self) -> &mut txsubmission::Server {
363        &mut self.txsubmission
364    }
365
366    /// Get mutable access to the keep-alive server.
367    pub fn keepalive(&mut self) -> &mut keepalive::Server {
368        &mut self.keepalive
369    }
370
371    /// Get mutable access to the peer-sharing server.
372    pub fn peersharing(&mut self) -> &mut peersharing::Server {
373        &mut self.peersharing
374    }
375
376    /// Remote socket address of the accepted peer.
377    pub fn accepted_address(&self) -> Option<&SocketAddr> {
378        self.accepted_address.as_ref()
379    }
380
381    /// Version number negotiated with the accepted peer plus its version data.
382    pub fn accepted_version(&self) -> Option<&(u64, n2n::VersionData)> {
383        self.accepted_version.as_ref()
384    }
385
386    /// Tear down the underlying multiplexer.
387    pub async fn abort(self) {
388        self.plexer.abort().await
389    }
390}
391
392/// Client of N2C Ouroboros
393pub struct NodeClient {
394    plexer: RunningPlexer,
395    handshake: handshake::N2CClient,
396    chainsync: chainsync::N2CClient,
397    statequery: localstate::Client,
398    submission: localtxsubmission::Client,
399    monitor: txmonitor::Client,
400}
401
402impl NodeClient {
403    /// Build a client over an already-opened bearer (does not perform the handshake).
404    pub fn new(bearer: Bearer) -> Self {
405        let mut plexer = multiplexer::Plexer::new(bearer);
406
407        let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
408        let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC);
409        let sq_channel = plexer.subscribe_client(PROTOCOL_N2C_STATE_QUERY);
410        let tx_channel = plexer.subscribe_client(PROTOCOL_N2C_TX_SUBMISSION);
411        let mo_channel = plexer.subscribe_client(PROTOCOL_N2C_TX_MONITOR);
412
413        let plexer = plexer.spawn();
414
415        Self {
416            plexer,
417            handshake: handshake::Client::new(hs_channel),
418            chainsync: chainsync::Client::new(cs_channel),
419            statequery: localstate::Client::new(sq_channel),
420            submission: localtxsubmission::Client::new(tx_channel),
421            monitor: txmonitor::Client::new(mo_channel),
422        }
423    }
424
425    /// Connect to a Unix-domain node socket at `path` and perform the N2C handshake.
426    #[cfg(unix)]
427    pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
428        let bearer = Bearer::connect_unix(path)
429            .await
430            .map_err(Error::ConnectFailure)?;
431
432        let mut client = Self::new(bearer);
433
434        let versions = handshake::n2c::VersionTable::v10_and_above(magic);
435
436        let handshake = client
437            .handshake()
438            .handshake(versions)
439            .await
440            .map_err(Error::HandshakeProtocol)?;
441
442        if let handshake::Confirmation::Rejected(reason) = handshake {
443            error!(?reason, "handshake refused");
444            return Err(Error::IncompatibleVersion);
445        }
446
447        Ok(client)
448    }
449
450    /// Connect to a Windows named-pipe node socket and perform the N2C handshake.
451    #[cfg(windows)]
452    pub async fn connect(
453        pipe_name: impl AsRef<std::ffi::OsStr>,
454        magic: u64,
455    ) -> Result<Self, Error> {
456        let pipe_name = pipe_name.as_ref().to_os_string();
457
458        let bearer = tokio::task::spawn_blocking(move || Bearer::connect_named_pipe(pipe_name))
459            .await
460            .expect("can't join tokio thread")
461            .map_err(Error::ConnectFailure)?;
462
463        let mut client = Self::new(bearer);
464
465        let versions = handshake::n2c::VersionTable::v10_and_above(magic);
466
467        let handshake = client
468            .handshake()
469            .handshake(versions)
470            .await
471            .map_err(Error::HandshakeProtocol)?;
472
473        if let handshake::Confirmation::Rejected(reason) = handshake {
474            error!(?reason, "handshake refused");
475            return Err(Error::IncompatibleVersion);
476        }
477
478        Ok(client)
479    }
480
481    /// Issue a query-mode handshake over `bearer` and return the node's
482    /// advertised version table.
483    #[cfg(unix)]
484    pub async fn handshake_query(
485        bearer: Bearer,
486        magic: u64,
487    ) -> Result<handshake::n2c::VersionTable, Error> {
488        let mut plexer = multiplexer::Plexer::new(bearer);
489
490        let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
491
492        let plexer = plexer.spawn();
493
494        let versions = handshake::n2c::VersionTable::v15_with_query(magic);
495        let mut client = handshake::Client::new(hs_channel);
496
497        let handshake = client
498            .handshake(versions)
499            .await
500            .map_err(Error::HandshakeProtocol)?;
501
502        match handshake {
503            Confirmation::Accepted(_, _) => {
504                error!("handshake accepted when we expected query reply");
505                Err(Error::HandshakeProtocol(handshake::Error::InvalidInbound))
506            }
507            Confirmation::Rejected(reason) => {
508                error!(?reason, "handshake refused");
509                Err(Error::IncompatibleVersion)
510            }
511            Confirmation::QueryReply(version_table) => {
512                plexer.abort().await;
513                Ok(version_table)
514            }
515        }
516    }
517
518    /// Get mutable access to the handshake client.
519    pub fn handshake(&mut self) -> &mut handshake::N2CClient {
520        &mut self.handshake
521    }
522
523    /// Get mutable access to the chain-sync client (N2C).
524    pub fn chainsync(&mut self) -> &mut chainsync::N2CClient {
525        &mut self.chainsync
526    }
527
528    /// Get mutable access to the local-state-query client.
529    pub fn statequery(&mut self) -> &mut localstate::Client {
530        &mut self.statequery
531    }
532
533    /// Get mutable access to the local-tx-submission client.
534    pub fn submission(&mut self) -> &mut localtxsubmission::Client {
535        &mut self.submission
536    }
537
538    /// Get mutable access to the tx-monitor client.
539    pub fn monitor(&mut self) -> &mut txmonitor::Client {
540        &mut self.monitor
541    }
542
543    /// Tear down the underlying multiplexer.
544    pub async fn abort(self) {
545        self.plexer.abort().await
546    }
547}
548
549/// Node-to-client Ouroboros server (the node side of a local connection).
550#[cfg(unix)]
551pub struct NodeServer {
552    /// Underlying running multiplexer.
553    pub plexer: RunningPlexer,
554    /// Handshake server.
555    pub handshake: handshake::N2CServer,
556    /// Chain-sync server (N2C).
557    pub chainsync: chainsync::N2CServer,
558    /// Local-state-query server.
559    pub statequery: localstate::Server,
560    /// Local-tx-submission server.
561    pub localtxsubmission: localtxsubmission::Server,
562    accepted_address: Option<UnixSocketAddr>,
563    accpeted_version: Option<(VersionNumber, n2c::VersionData)>,
564}
565
566#[cfg(unix)]
567impl NodeServer {
568    /// Build a server over an already-accepted bearer.
569    pub async fn new(bearer: Bearer) -> Self {
570        let mut plexer = multiplexer::Plexer::new(bearer);
571
572        let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
573        let cs_channel = plexer.subscribe_server(PROTOCOL_N2C_CHAIN_SYNC);
574        let sq_channel = plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY);
575        let localtx_channel = plexer.subscribe_server(PROTOCOL_N2C_TX_SUBMISSION);
576
577        let server_hs = handshake::Server::<n2c::VersionData>::new(hs_channel);
578        let server_cs = chainsync::N2CServer::new(cs_channel);
579        let server_sq = localstate::Server::new(sq_channel);
580        let server_localtx = localtxsubmission::Server::new(localtx_channel);
581
582        let plexer = plexer.spawn();
583
584        Self {
585            plexer,
586            handshake: server_hs,
587            chainsync: server_cs,
588            statequery: server_sq,
589            localtxsubmission: server_localtx,
590            accepted_address: None,
591            accpeted_version: None,
592        }
593    }
594
595    /// Accept the next Unix-domain connection from `listener` and complete the N2C handshake.
596    pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
597        let (bearer, address) = Bearer::accept_unix(listener)
598            .await
599            .map_err(Error::ConnectFailure)?;
600
601        let mut client = Self::new(bearer).await;
602
603        let accepted_version = client
604            .handshake()
605            .handshake(n2c::VersionTable::v10_and_above(magic))
606            .await
607            .map_err(Error::HandshakeProtocol)?;
608
609        if let Some(version) = accepted_version {
610            client.accepted_address = Some(address);
611            client.accpeted_version = Some(version);
612            Ok(client)
613        } else {
614            client.abort().await;
615            Err(Error::IncompatibleVersion)
616        }
617    }
618
619    /// Get mutable access to the handshake server.
620    pub fn handshake(&mut self) -> &mut handshake::N2CServer {
621        &mut self.handshake
622    }
623
624    /// Get mutable access to the chain-sync server (N2C).
625    pub fn chainsync(&mut self) -> &mut chainsync::N2CServer {
626        &mut self.chainsync
627    }
628
629    /// Get mutable access to the local-state-query server.
630    pub fn statequery(&mut self) -> &mut localstate::Server {
631        &mut self.statequery
632    }
633
634    /// Get mutable access to the local-tx-submission server.
635    pub fn localtxsubmission(&mut self) -> &mut localtxsubmission::Server {
636        &mut self.localtxsubmission
637    }
638
639    /// Remote address of the accepted local client.
640    pub fn accepted_address(&self) -> Option<&UnixSocketAddr> {
641        self.accepted_address.as_ref()
642    }
643
644    /// Version negotiated with the accepted local client.
645    pub fn accepted_version(&self) -> Option<&(u64, n2c::VersionData)> {
646        self.accpeted_version.as_ref()
647    }
648
649    /// Tear down the underlying multiplexer.
650    pub async fn abort(self) {
651        self.plexer.abort().await
652    }
653}
654
655/// Client of N2C DMQ (Decentralized Message Queue)
656///
657/// Described in [CIP-0137](https://github.com/cardano-foundation/CIPs/tree/master/CIP-0137)
658pub struct DmqClient {
659    plexer: RunningPlexer,
660    handshake: handshake::N2CClient,
661    msg_submission: localmsgsubmission::Client,
662    msg_notification: localmsgnotification::Client,
663}
664
665impl DmqClient {
666    /// Build a DMQ client over an already-opened bearer.
667    pub fn new(bearer: Bearer) -> Self {
668        let mut plexer = multiplexer::Plexer::new(bearer);
669
670        let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
671        let msg_submission_channel = plexer.subscribe_client(PROTOCOL_N2C_MSG_SUBMISSION);
672        let msg_notification_channel = plexer.subscribe_client(PROTOCOL_N2C_MSG_NOTIFICATION);
673
674        let plexer = plexer.spawn();
675
676        Self {
677            plexer,
678            handshake: handshake::Client::new(hs_channel),
679            msg_submission: localmsgsubmission::Client::new(msg_submission_channel),
680            msg_notification: localmsgnotification::Client::new(msg_notification_channel),
681        }
682    }
683
684    /// Connect to a DMQ node socket at `path` and perform the DMQ handshake.
685    #[cfg(unix)]
686    pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
687        let bearer = Bearer::connect_unix(path)
688            .await
689            .map_err(Error::ConnectFailure)?;
690
691        let mut client = Self::new(bearer);
692
693        let versions = handshake::n2c::VersionTable::dmq(magic);
694
695        let handshake = client
696            .handshake()
697            .handshake(versions)
698            .await
699            .map_err(Error::HandshakeProtocol)?;
700
701        if let handshake::Confirmation::Rejected(reason) = handshake {
702            error!(?reason, "handshake refused");
703            return Err(Error::IncompatibleVersion);
704        }
705
706        Ok(client)
707    }
708
709    /// Connect to a DMQ node over a Windows named pipe and perform the handshake.
710    #[cfg(windows)]
711    pub async fn connect(
712        pipe_name: impl AsRef<std::ffi::OsStr>,
713        magic: u64,
714    ) -> Result<Self, Error> {
715        let pipe_name = pipe_name.as_ref().to_os_string();
716
717        let bearer = tokio::task::spawn_blocking(move || Bearer::connect_named_pipe(pipe_name))
718            .await
719            .expect("can't join tokio thread")
720            .map_err(Error::ConnectFailure)?;
721
722        let mut client = Self::new(bearer);
723
724        let versions = handshake::n2c::VersionTable::v10_and_above(magic);
725
726        let handshake = client
727            .handshake()
728            .handshake(versions)
729            .await
730            .map_err(Error::HandshakeProtocol)?;
731
732        if let handshake::Confirmation::Rejected(reason) = handshake {
733            error!(?reason, "handshake refused");
734            return Err(Error::IncompatibleVersion);
735        }
736
737        Ok(client)
738    }
739
740    /// Issue a DMQ query-mode handshake and return the node's advertised version table.
741    #[cfg(unix)]
742    pub async fn handshake_query(
743        bearer: Bearer,
744        magic: u64,
745    ) -> Result<handshake::n2c::VersionTable, Error> {
746        let mut plexer = multiplexer::Plexer::new(bearer);
747
748        let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
749
750        let plexer = plexer.spawn();
751
752        let versions = handshake::n2c::VersionTable::dmq(magic);
753        let mut client = handshake::Client::new(hs_channel);
754
755        let handshake = client
756            .handshake(versions)
757            .await
758            .map_err(Error::HandshakeProtocol)?;
759
760        match handshake {
761            Confirmation::Accepted(_, _) => {
762                error!("handshake accepted when we expected query reply");
763                Err(Error::HandshakeProtocol(handshake::Error::InvalidInbound))
764            }
765            Confirmation::Rejected(reason) => {
766                error!(?reason, "handshake refused");
767                Err(Error::IncompatibleVersion)
768            }
769            Confirmation::QueryReply(version_table) => {
770                plexer.abort().await;
771                Ok(version_table)
772            }
773        }
774    }
775
776    /// Get mutable access to the handshake client.
777    pub fn handshake(&mut self) -> &mut handshake::N2CClient {
778        &mut self.handshake
779    }
780
781    /// Get mutable access to the DMQ message-submission client.
782    pub fn msg_submission(&mut self) -> &mut localmsgsubmission::Client {
783        &mut self.msg_submission
784    }
785
786    /// Get mutable access to the DMQ message-notification client.
787    pub fn msg_notification(&mut self) -> &mut localmsgnotification::Client {
788        &mut self.msg_notification
789    }
790
791    /// Tear down the underlying multiplexer.
792    pub async fn abort(self) {
793        self.plexer.abort().await
794    }
795}
796
797/// Server of N2C DMQ (Decentralized Message Queue)
798///
799/// Described in [CIP-0137](https://github.com/cardano-foundation/CIPs/tree/master/CIP-0137)
800#[cfg(unix)]
801pub struct DmqServer {
802    /// Underlying running multiplexer.
803    pub plexer: RunningPlexer,
804    /// Handshake server.
805    pub handshake: handshake::N2CServer,
806    /// DMQ message-notification server.
807    pub msg_notification: localmsgnotification::Server,
808    /// DMQ message-submission server.
809    pub msg_submission: localmsgsubmission::Server,
810    accepted_address: Option<UnixSocketAddr>,
811    accpeted_version: Option<(VersionNumber, n2c::VersionData)>,
812}
813
814#[cfg(unix)]
815impl DmqServer {
816    /// Build a DMQ server over an already-accepted bearer.
817    pub async fn new(bearer: Bearer) -> Self {
818        let mut plexer = multiplexer::Plexer::new(bearer);
819
820        let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
821        let msg_notification_channel = plexer.subscribe_server(PROTOCOL_N2C_MSG_NOTIFICATION);
822        let msg_submission_channel = plexer.subscribe_server(PROTOCOL_N2C_MSG_SUBMISSION);
823
824        let server_hs = handshake::Server::<n2c::VersionData>::new(hs_channel);
825        let server_msg_notification = localmsgnotification::Server::new(msg_notification_channel);
826        let server_msg_submission = localmsgsubmission::Server::new(msg_submission_channel);
827
828        let plexer = plexer.spawn();
829
830        Self {
831            plexer,
832            handshake: server_hs,
833            msg_notification: server_msg_notification,
834            msg_submission: server_msg_submission,
835            accepted_address: None,
836            accpeted_version: None,
837        }
838    }
839
840    /// Accept the next DMQ connection from `listener` and complete the handshake.
841    pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
842        let (bearer, address) = Bearer::accept_unix(listener)
843            .await
844            .map_err(Error::ConnectFailure)?;
845
846        let mut client = Self::new(bearer).await;
847
848        let accepted_version = client
849            .handshake()
850            .handshake(n2c::VersionTable::dmq(magic))
851            .await
852            .map_err(Error::HandshakeProtocol)?;
853
854        if let Some(version) = accepted_version {
855            client.accepted_address = Some(address);
856            client.accpeted_version = Some(version);
857            Ok(client)
858        } else {
859            client.abort().await;
860            Err(Error::IncompatibleVersion)
861        }
862    }
863
864    /// Get mutable access to the handshake server.
865    pub fn handshake(&mut self) -> &mut handshake::N2CServer {
866        &mut self.handshake
867    }
868
869    /// Get mutable access to the DMQ message-notification server.
870    pub fn msg_notification(&mut self) -> &mut localmsgnotification::Server {
871        &mut self.msg_notification
872    }
873
874    /// Get mutable access to the DMQ message-submission server.
875    pub fn msg_submission(&mut self) -> &mut localmsgsubmission::Server {
876        &mut self.msg_submission
877    }
878
879    /// Remote address of the accepted DMQ client.
880    pub fn accepted_address(&self) -> Option<&UnixSocketAddr> {
881        self.accepted_address.as_ref()
882    }
883
884    /// Version negotiated with the accepted DMQ client.
885    pub fn accepted_version(&self) -> Option<&(u64, n2c::VersionData)> {
886        self.accpeted_version.as_ref()
887    }
888
889    /// Tear down the underlying multiplexer.
890    pub async fn abort(self) {
891        self.plexer.abort().await
892    }
893}