hydroflow_cli_integration/
lib.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use async_recursion::async_recursion;
10use async_trait::async_trait;
11use bytes::{Bytes, BytesMut};
12use futures::sink::Buffer;
13use futures::{ready, stream, Future, Sink, SinkExt, Stream};
14use pin_project::pin_project;
15use serde::{Deserialize, Serialize};
16use tokio::io;
17use tokio::net::{TcpListener, TcpStream};
18#[cfg(unix)]
19use tokio::net::{UnixListener, UnixStream};
20use tokio::task::JoinHandle;
21use tokio_stream::wrappers::TcpListenerStream;
22use tokio_stream::StreamExt;
23use tokio_util::codec::{Framed, LengthDelimitedCodec};
24
25pub type InitConfig = (HashMap<String, ServerBindConfig>, Option<String>);
26
27#[cfg(not(unix))]
28#[allow(dead_code)]
29type UnixStream = ();
30
31#[cfg(not(unix))]
32#[allow(dead_code)]
33type UnixListener = ();
34
35/// Describes how to connect to a service which is listening on some port.
36#[allow(unreachable_code)]
37#[derive(Serialize, Deserialize, Clone, Debug)]
38pub enum ServerPort {
39    UnixSocket(PathBuf),
40    TcpPort(SocketAddr),
41    Demux(HashMap<u32, ServerPort>),
42    Merge(Vec<ServerPort>),
43    Tagged(Box<ServerPort>, u32),
44    Null,
45}
46
47impl ServerPort {
48    pub fn instantiate(&self) -> ServerOrBound {
49        ServerOrBound::Server(self.into())
50    }
51}
52
53#[derive(Debug)]
54pub enum RealizedServerPort {
55    UnixSocket(JoinHandle<io::Result<UnixStream>>),
56    TcpPort(JoinHandle<io::Result<TcpStream>>),
57    Demux(HashMap<u32, RealizedServerPort>),
58    Merge(Vec<RealizedServerPort>),
59    Tagged(Box<RealizedServerPort>, u32),
60    Null,
61}
62
63impl From<&ServerPort> for RealizedServerPort {
64    fn from(port: &ServerPort) -> Self {
65        match port {
66            ServerPort::UnixSocket(path) => {
67                #[cfg(unix)]
68                {
69                    let bound = UnixStream::connect(path.clone());
70                    RealizedServerPort::UnixSocket(tokio::spawn(bound))
71                }
72
73                #[cfg(not(unix))]
74                {
75                    let _ = path;
76                    panic!("Unix sockets are not supported on this platform")
77                }
78            }
79            ServerPort::TcpPort(addr) => {
80                let addr_clone = *addr;
81                let bound = async_retry(
82                    move || TcpStream::connect(addr_clone),
83                    10,
84                    Duration::from_secs(1),
85                );
86                RealizedServerPort::TcpPort(tokio::spawn(bound))
87            }
88            ServerPort::Demux(bindings) => {
89                RealizedServerPort::Demux(bindings.iter().map(|(k, v)| (*k, v.into())).collect())
90            }
91            ServerPort::Merge(ports) => {
92                RealizedServerPort::Merge(ports.iter().map(|p| p.into()).collect())
93            }
94            ServerPort::Tagged(port, tag) => {
95                RealizedServerPort::Tagged(Box::new(port.as_ref().into()), *tag)
96            }
97            ServerPort::Null => RealizedServerPort::Null,
98        }
99    }
100}
101
102#[derive(Serialize, Deserialize, Clone, Debug)]
103pub enum ServerBindConfig {
104    UnixSocket,
105    TcpPort(
106        /// The host the port should be bound on.
107        String,
108    ),
109    Demux(HashMap<u32, ServerBindConfig>),
110    Merge(Vec<ServerBindConfig>),
111    Tagged(Box<ServerBindConfig>, u32),
112    Null,
113}
114
115impl ServerBindConfig {
116    #[async_recursion]
117    pub async fn bind(self) -> BoundConnection {
118        match self {
119            ServerBindConfig::UnixSocket => {
120                #[cfg(unix)]
121                {
122                    let dir = tempfile::tempdir().unwrap();
123                    let socket_path = dir.path().join("socket");
124                    let bound = UnixListener::bind(socket_path).unwrap();
125                    BoundConnection::UnixSocket(
126                        tokio::spawn(async move { Ok(bound.accept().await?.0) }),
127                        dir,
128                    )
129                }
130
131                #[cfg(not(unix))]
132                {
133                    panic!("Unix sockets are not supported on this platform")
134                }
135            }
136            ServerBindConfig::TcpPort(host) => {
137                let listener = TcpListener::bind((host, 0)).await.unwrap();
138                let addr = listener.local_addr().unwrap();
139                BoundConnection::TcpPort(TcpListenerStream::new(listener), addr)
140            }
141            ServerBindConfig::Demux(bindings) => {
142                let mut demux = HashMap::new();
143                for (key, bind) in bindings {
144                    demux.insert(key, bind.bind().await);
145                }
146                BoundConnection::Demux(demux)
147            }
148            ServerBindConfig::Merge(bindings) => {
149                let mut merge = Vec::new();
150                for bind in bindings {
151                    merge.push(bind.bind().await);
152                }
153                BoundConnection::Merge(merge)
154            }
155            ServerBindConfig::Tagged(underlying, id) => {
156                BoundConnection::Tagged(Box::new(underlying.bind().await), id)
157            }
158            ServerBindConfig::Null => BoundConnection::Null,
159        }
160    }
161}
162
163#[derive(Debug)]
164pub enum ServerOrBound {
165    Server(RealizedServerPort),
166    Bound(BoundConnection),
167}
168
169impl ServerOrBound {
170    pub async fn connect<T: Connected>(self) -> T {
171        T::from_defn(self).await
172    }
173
174    pub fn connect_local_blocking<T: Connected>(self) -> T {
175        let handle = tokio::runtime::Handle::current();
176        let _guard = handle.enter();
177        futures::executor::block_on(T::from_defn(self))
178    }
179
180    pub async fn accept_tcp(&mut self) -> TcpStream {
181        if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self {
182            handle.next().await.unwrap().unwrap()
183        } else {
184            panic!("Not a TCP port")
185        }
186    }
187}
188
189pub type DynStream = Pin<Box<dyn Stream<Item = Result<BytesMut, io::Error>> + Send + Sync>>;
190
191pub type DynSink<Input> = Pin<Box<dyn Sink<Input, Error = io::Error> + Send + Sync>>;
192
193pub trait StreamSink:
194    Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>
195{
196}
197impl<T: Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>> StreamSink
198    for T
199{
200}
201
202pub type DynStreamSink = Pin<Box<dyn StreamSink + Send + Sync>>;
203
204#[async_trait]
205pub trait Connected: Send {
206    async fn from_defn(pipe: ServerOrBound) -> Self;
207}
208
209pub trait ConnectedSink {
210    type Input: Send;
211    type Sink: Sink<Self::Input, Error = io::Error> + Send + Sync;
212
213    fn into_sink(self) -> Self::Sink;
214}
215
216pub trait ConnectedSource {
217    type Output: Send;
218    type Stream: Stream<Item = Result<Self::Output, io::Error>> + Send + Sync;
219    fn into_source(self) -> Self::Stream;
220}
221
222#[derive(Debug)]
223pub enum BoundConnection {
224    UnixSocket(JoinHandle<io::Result<UnixStream>>, tempfile::TempDir),
225    TcpPort(TcpListenerStream, SocketAddr),
226    Demux(HashMap<u32, BoundConnection>),
227    Merge(Vec<BoundConnection>),
228    Tagged(Box<BoundConnection>, u32),
229    Null,
230}
231
232impl BoundConnection {
233    pub fn sink_port(&self) -> ServerPort {
234        match self {
235            BoundConnection::UnixSocket(_, tempdir) => {
236                #[cfg(unix)]
237                {
238                    ServerPort::UnixSocket(tempdir.path().join("socket"))
239                }
240
241                #[cfg(not(unix))]
242                {
243                    let _ = tempdir;
244                    panic!("Unix sockets are not supported on this platform")
245                }
246            }
247            BoundConnection::TcpPort(_, addr) => {
248                ServerPort::TcpPort(SocketAddr::new(addr.ip(), addr.port()))
249            }
250
251            BoundConnection::Demux(bindings) => {
252                let mut demux = HashMap::new();
253                for (key, bind) in bindings {
254                    demux.insert(*key, bind.sink_port());
255                }
256                ServerPort::Demux(demux)
257            }
258
259            BoundConnection::Merge(bindings) => {
260                let mut merge = Vec::new();
261                for bind in bindings {
262                    merge.push(bind.sink_port());
263                }
264                ServerPort::Merge(merge)
265            }
266
267            BoundConnection::Tagged(underlying, id) => {
268                ServerPort::Tagged(Box::new(underlying.sink_port()), *id)
269            }
270
271            BoundConnection::Null => ServerPort::Null,
272        }
273    }
274}
275
276#[async_recursion]
277async fn accept(bound: BoundConnection) -> ConnectedDirect {
278    match bound {
279        BoundConnection::UnixSocket(listener, _) => {
280            #[cfg(unix)]
281            {
282                let stream = listener.await.unwrap().unwrap();
283                ConnectedDirect {
284                    stream_sink: Some(Box::pin(unix_bytes(stream))),
285                    source_only: None,
286                    sink_only: None,
287                }
288            }
289
290            #[cfg(not(unix))]
291            {
292                drop(listener);
293                panic!("Unix sockets are not supported on this platform")
294            }
295        }
296        BoundConnection::TcpPort(mut listener, _) => {
297            let stream = listener.next().await.unwrap().unwrap();
298            ConnectedDirect {
299                stream_sink: Some(Box::pin(tcp_bytes(stream))),
300                source_only: None,
301                sink_only: None,
302            }
303        }
304        BoundConnection::Merge(merge) => {
305            let mut sources = vec![];
306            for bound in merge {
307                sources.push(accept(bound).await.into_source());
308            }
309
310            let merge_source: DynStream = Box::pin(MergeSource {
311                marker: PhantomData,
312                sources,
313            });
314
315            ConnectedDirect {
316                stream_sink: None,
317                source_only: Some(merge_source),
318                sink_only: None,
319            }
320        }
321        BoundConnection::Demux(_) => panic!("Cannot connect to a demux pipe directly"),
322        BoundConnection::Tagged(_, _) => panic!("Cannot connect to a tagged pipe directly"),
323        BoundConnection::Null => {
324            ConnectedDirect::from_defn(ServerOrBound::Server(RealizedServerPort::Null)).await
325        }
326    }
327}
328
329fn tcp_bytes(stream: TcpStream) -> impl StreamSink {
330    Framed::new(stream, LengthDelimitedCodec::new())
331}
332
333#[cfg(unix)]
334fn unix_bytes(stream: UnixStream) -> impl StreamSink {
335    Framed::new(stream, LengthDelimitedCodec::new())
336}
337
338struct IoErrorDrain<T> {
339    marker: PhantomData<T>,
340}
341
342impl<T> Sink<T> for IoErrorDrain<T> {
343    type Error = io::Error;
344
345    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
346        Poll::Ready(Ok(()))
347    }
348
349    fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> {
350        Ok(())
351    }
352
353    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
354        Poll::Ready(Ok(()))
355    }
356
357    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
358        Poll::Ready(Ok(()))
359    }
360}
361
362async fn async_retry<T, E, F: Future<Output = Result<T, E>>>(
363    thunk: impl Fn() -> F,
364    count: usize,
365    delay: Duration,
366) -> Result<T, E> {
367    for _ in 1..count {
368        let result = thunk().await;
369        if result.is_ok() {
370            return result;
371        } else {
372            tokio::time::sleep(delay).await;
373        }
374    }
375
376    thunk().await
377}
378
379pub struct ConnectedDirect {
380    stream_sink: Option<DynStreamSink>,
381    source_only: Option<DynStream>,
382    sink_only: Option<DynSink<Bytes>>,
383}
384
385#[async_trait]
386impl Connected for ConnectedDirect {
387    async fn from_defn(pipe: ServerOrBound) -> Self {
388        match pipe {
389            ServerOrBound::Server(RealizedServerPort::UnixSocket(stream)) => {
390                #[cfg(unix)]
391                {
392                    let stream = stream.await.unwrap().unwrap();
393                    ConnectedDirect {
394                        stream_sink: Some(Box::pin(unix_bytes(stream))),
395                        source_only: None,
396                        sink_only: None,
397                    }
398                }
399
400                #[cfg(not(unix))]
401                {
402                    drop(stream);
403                    panic!("Unix sockets are not supported on this platform");
404                }
405            }
406            ServerOrBound::Server(RealizedServerPort::TcpPort(stream)) => {
407                let stream = stream.await.unwrap().unwrap();
408                stream.set_nodelay(true).unwrap();
409                ConnectedDirect {
410                    stream_sink: Some(Box::pin(tcp_bytes(stream))),
411                    source_only: None,
412                    sink_only: None,
413                }
414            }
415            ServerOrBound::Server(RealizedServerPort::Merge(merge)) => {
416                let sources = futures::future::join_all(merge.into_iter().map(|port| async {
417                    ConnectedDirect::from_defn(ServerOrBound::Server(port))
418                        .await
419                        .into_source()
420                }))
421                .await;
422
423                let merged = MergeSource {
424                    marker: PhantomData,
425                    sources,
426                };
427
428                ConnectedDirect {
429                    stream_sink: None,
430                    source_only: Some(Box::pin(merged)),
431                    sink_only: None,
432                }
433            }
434            ServerOrBound::Server(RealizedServerPort::Demux(_)) => {
435                panic!("Cannot connect to a demux pipe directly")
436            }
437
438            ServerOrBound::Server(RealizedServerPort::Tagged(_, _)) => {
439                panic!("Cannot connect to a tagged pipe directly")
440            }
441
442            ServerOrBound::Server(RealizedServerPort::Null) => ConnectedDirect {
443                stream_sink: None,
444                source_only: Some(Box::pin(stream::empty())),
445                sink_only: Some(Box::pin(IoErrorDrain {
446                    marker: PhantomData,
447                })),
448            },
449
450            ServerOrBound::Bound(bound) => accept(bound).await,
451        }
452    }
453}
454
455impl ConnectedSource for ConnectedDirect {
456    type Output = BytesMut;
457    type Stream = DynStream;
458
459    fn into_source(mut self) -> DynStream {
460        if let Some(s) = self.stream_sink.take() {
461            Box::pin(s)
462        } else {
463            self.source_only.take().unwrap()
464        }
465    }
466}
467
468impl ConnectedSink for ConnectedDirect {
469    type Input = Bytes;
470    type Sink = DynSink<Bytes>;
471
472    fn into_sink(mut self) -> DynSink<Self::Input> {
473        if let Some(s) = self.stream_sink.take() {
474            Box::pin(s)
475        } else {
476            self.sink_only.take().unwrap()
477        }
478    }
479}
480
481pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;
482
483pub struct ConnectedDemux<T: ConnectedSink>
484where
485    <T as ConnectedSink>::Input: Sync,
486{
487    pub keys: Vec<u32>,
488    sink: Option<BufferedDrain<T::Sink, T::Input>>,
489}
490
491#[pin_project]
492pub struct DemuxDrain<T, S: Sink<T, Error = io::Error> + Send + Sync + ?Sized> {
493    marker: PhantomData<T>,
494    #[pin]
495    sinks: HashMap<u32, Pin<Box<S>>>,
496}
497
498impl<T, S: Sink<T, Error = io::Error> + Send + Sync> Sink<(u32, T)> for DemuxDrain<T, S> {
499    type Error = io::Error;
500
501    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
502        for sink in self.project().sinks.values_mut() {
503            ready!(Sink::poll_ready(sink.as_mut(), _cx))?;
504        }
505
506        Poll::Ready(Ok(()))
507    }
508
509    fn start_send(self: Pin<&mut Self>, item: (u32, T)) -> Result<(), Self::Error> {
510        Sink::start_send(
511            self.project()
512                .sinks
513                .get_mut()
514                .get_mut(&item.0)
515                .unwrap_or_else(|| panic!("No sink in this demux for key {}", item.0))
516                .as_mut(),
517            item.1,
518        )
519    }
520
521    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
522        for sink in self.project().sinks.values_mut() {
523            ready!(Sink::poll_flush(sink.as_mut(), _cx))?;
524        }
525
526        Poll::Ready(Ok(()))
527    }
528
529    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
530        for sink in self.project().sinks.values_mut() {
531            ready!(Sink::poll_close(sink.as_mut(), _cx))?;
532        }
533
534        Poll::Ready(Ok(()))
535    }
536}
537
538#[async_trait]
539impl<T: Connected + ConnectedSink> Connected for ConnectedDemux<T>
540where
541    <T as ConnectedSink>::Input: 'static + Sync,
542{
543    async fn from_defn(pipe: ServerOrBound) -> Self {
544        match pipe {
545            ServerOrBound::Server(RealizedServerPort::Demux(demux)) => {
546                let mut connected_demux = HashMap::new();
547                let keys = demux.keys().cloned().collect();
548                for (id, pipe) in demux {
549                    connected_demux.insert(
550                        id,
551                        Box::pin(
552                            T::from_defn(ServerOrBound::Server(pipe))
553                                .await
554                                .into_sink()
555                                .buffer(1024),
556                        ),
557                    );
558                }
559
560                let demuxer = DemuxDrain {
561                    marker: PhantomData,
562                    sinks: connected_demux,
563                };
564
565                ConnectedDemux {
566                    keys,
567                    sink: Some(demuxer),
568                }
569            }
570
571            ServerOrBound::Bound(BoundConnection::Demux(demux)) => {
572                let mut connected_demux = HashMap::new();
573                let keys = demux.keys().cloned().collect();
574                for (id, bound) in demux {
575                    connected_demux.insert(
576                        id,
577                        Box::pin(
578                            T::from_defn(ServerOrBound::Bound(bound))
579                                .await
580                                .into_sink()
581                                .buffer(1024),
582                        ),
583                    );
584                }
585
586                let demuxer = DemuxDrain {
587                    marker: PhantomData,
588                    sinks: connected_demux,
589                };
590
591                ConnectedDemux {
592                    keys,
593                    sink: Some(demuxer),
594                }
595            }
596            _ => panic!("Cannot connect to a non-demux pipe as a demux"),
597        }
598    }
599}
600
601impl<T: ConnectedSink> ConnectedSink for ConnectedDemux<T>
602where
603    <T as ConnectedSink>::Input: 'static + Sync,
604{
605    type Input = (u32, T::Input);
606    type Sink = BufferedDrain<T::Sink, T::Input>;
607
608    fn into_sink(mut self) -> Self::Sink {
609        self.sink.take().unwrap()
610    }
611}
612
613pub struct MergeSource<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> {
614    marker: PhantomData<T>,
615    sources: Vec<Pin<Box<S>>>,
616}
617
618impl<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> Stream for MergeSource<T, S> {
619    type Item = T;
620
621    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
622        let sources = &mut self.get_mut().sources;
623        let mut next = None;
624
625        let mut i = 0;
626        while i < sources.len() {
627            match sources[i].as_mut().poll_next(cx) {
628                Poll::Ready(Some(v)) => {
629                    next = Some(v);
630                    break;
631                }
632                Poll::Ready(None) => {
633                    // this happens infrequently, so OK to be O(n)
634                    sources.remove(i);
635                }
636                Poll::Pending => {
637                    i += 1;
638                }
639            }
640        }
641
642        if sources.is_empty() {
643            Poll::Ready(None)
644        } else if next.is_none() {
645            Poll::Pending
646        } else {
647            Poll::Ready(next)
648        }
649    }
650}
651
652pub struct TaggedSource<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> {
653    marker: PhantomData<T>,
654    id: u32,
655    source: Pin<Box<S>>,
656}
657
658impl<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> Stream
659    for TaggedSource<T, S>
660{
661    type Item = Result<(u32, T), io::Error>;
662
663    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
664        let id = self.as_ref().id;
665        let source = &mut self.get_mut().source;
666        match source.as_mut().poll_next(cx) {
667            Poll::Ready(Some(v)) => Poll::Ready(Some(v.map(|d| (id, d)))),
668            Poll::Ready(None) => Poll::Ready(None),
669            Poll::Pending => Poll::Pending,
670        }
671    }
672}
673
674type MergedMux<T> = MergeSource<
675    Result<(u32, <T as ConnectedSource>::Output), io::Error>,
676    TaggedSource<<T as ConnectedSource>::Output, <T as ConnectedSource>::Stream>,
677>;
678
679pub struct ConnectedTagged<T: ConnectedSource>
680where
681    <T as ConnectedSource>::Output: 'static + Sync + Unpin,
682{
683    source: MergedMux<T>,
684}
685
686#[async_trait]
687impl<T: Connected + ConnectedSource> Connected for ConnectedTagged<T>
688where
689    <T as ConnectedSource>::Output: 'static + Sync + Unpin,
690{
691    async fn from_defn(pipe: ServerOrBound) -> Self {
692        let sources = match pipe {
693            ServerOrBound::Server(RealizedServerPort::Tagged(pipe, id)) => {
694                vec![(
695                    Box::pin(
696                        T::from_defn(ServerOrBound::Server(*pipe))
697                            .await
698                            .into_source(),
699                    ),
700                    id,
701                )]
702            }
703
704            ServerOrBound::Server(RealizedServerPort::Merge(m)) => {
705                let mut sources = Vec::new();
706                for port in m {
707                    if let RealizedServerPort::Tagged(pipe, id) = port {
708                        sources.push((
709                            Box::pin(
710                                T::from_defn(ServerOrBound::Server(*pipe))
711                                    .await
712                                    .into_source(),
713                            ),
714                            id,
715                        ));
716                    } else {
717                        panic!("Merge port must be tagged");
718                    }
719                }
720
721                sources
722            }
723
724            ServerOrBound::Bound(BoundConnection::Tagged(pipe, id)) => {
725                vec![(
726                    Box::pin(
727                        T::from_defn(ServerOrBound::Bound(*pipe))
728                            .await
729                            .into_source(),
730                    ),
731                    id,
732                )]
733            }
734
735            ServerOrBound::Bound(BoundConnection::Merge(m)) => {
736                let mut sources = Vec::new();
737                for port in m {
738                    if let BoundConnection::Tagged(pipe, id) = port {
739                        sources.push((
740                            Box::pin(
741                                T::from_defn(ServerOrBound::Bound(*pipe))
742                                    .await
743                                    .into_source(),
744                            ),
745                            id,
746                        ));
747                    } else {
748                        panic!("Merge port must be tagged");
749                    }
750                }
751
752                sources
753            }
754
755            _ => panic!("Cannot connect to a non-tagged pipe as a tagged"),
756        };
757
758        let mut connected_mux = Vec::new();
759        for (pipe, id) in sources {
760            connected_mux.push(Box::pin(TaggedSource {
761                marker: PhantomData,
762                id,
763                source: pipe,
764            }));
765        }
766
767        let muxer = MergeSource {
768            marker: PhantomData,
769            sources: connected_mux,
770        };
771
772        ConnectedTagged { source: muxer }
773    }
774}
775
776impl<T: ConnectedSource> ConnectedSource for ConnectedTagged<T>
777where
778    <T as ConnectedSource>::Output: 'static + Sync + Unpin,
779{
780    type Output = (u32, T::Output);
781    type Stream = MergeSource<Result<Self::Output, io::Error>, TaggedSource<T::Output, T::Stream>>;
782
783    fn into_source(self) -> Self::Stream {
784        self.source
785    }
786}