1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use async_recursion::async_recursion;
10use async_trait::async_trait;
11use bytes::{Bytes, BytesMut};
12use futures::sink::Buffer;
13use futures::{ready, stream, Future, Sink, SinkExt, Stream};
14use pin_project::pin_project;
15use serde::{Deserialize, Serialize};
16use tokio::io;
17use tokio::net::{TcpListener, TcpStream};
18#[cfg(unix)]
19use tokio::net::{UnixListener, UnixStream};
20use tokio::task::JoinHandle;
21use tokio_stream::wrappers::TcpListenerStream;
22use tokio_stream::StreamExt;
23use tokio_util::codec::{Framed, LengthDelimitedCodec};
24
25pub type InitConfig = (HashMap<String, ServerBindConfig>, Option<String>);
26
27#[cfg(not(unix))]
28#[allow(dead_code)]
29type UnixStream = ();
30
31#[cfg(not(unix))]
32#[allow(dead_code)]
33type UnixListener = ();
34
35#[allow(unreachable_code)]
37#[derive(Serialize, Deserialize, Clone, Debug)]
38pub enum ServerPort {
39 UnixSocket(PathBuf),
40 TcpPort(SocketAddr),
41 Demux(HashMap<u32, ServerPort>),
42 Merge(Vec<ServerPort>),
43 Tagged(Box<ServerPort>, u32),
44 Null,
45}
46
47impl ServerPort {
48 pub fn instantiate(&self) -> ServerOrBound {
49 ServerOrBound::Server(self.into())
50 }
51}
52
53#[derive(Debug)]
54pub enum RealizedServerPort {
55 UnixSocket(JoinHandle<io::Result<UnixStream>>),
56 TcpPort(JoinHandle<io::Result<TcpStream>>),
57 Demux(HashMap<u32, RealizedServerPort>),
58 Merge(Vec<RealizedServerPort>),
59 Tagged(Box<RealizedServerPort>, u32),
60 Null,
61}
62
63impl From<&ServerPort> for RealizedServerPort {
64 fn from(port: &ServerPort) -> Self {
65 match port {
66 ServerPort::UnixSocket(path) => {
67 #[cfg(unix)]
68 {
69 let bound = UnixStream::connect(path.clone());
70 RealizedServerPort::UnixSocket(tokio::spawn(bound))
71 }
72
73 #[cfg(not(unix))]
74 {
75 let _ = path;
76 panic!("Unix sockets are not supported on this platform")
77 }
78 }
79 ServerPort::TcpPort(addr) => {
80 let addr_clone = *addr;
81 let bound = async_retry(
82 move || TcpStream::connect(addr_clone),
83 10,
84 Duration::from_secs(1),
85 );
86 RealizedServerPort::TcpPort(tokio::spawn(bound))
87 }
88 ServerPort::Demux(bindings) => {
89 RealizedServerPort::Demux(bindings.iter().map(|(k, v)| (*k, v.into())).collect())
90 }
91 ServerPort::Merge(ports) => {
92 RealizedServerPort::Merge(ports.iter().map(|p| p.into()).collect())
93 }
94 ServerPort::Tagged(port, tag) => {
95 RealizedServerPort::Tagged(Box::new(port.as_ref().into()), *tag)
96 }
97 ServerPort::Null => RealizedServerPort::Null,
98 }
99 }
100}
101
102#[derive(Serialize, Deserialize, Clone, Debug)]
103pub enum ServerBindConfig {
104 UnixSocket,
105 TcpPort(
106 String,
108 ),
109 Demux(HashMap<u32, ServerBindConfig>),
110 Merge(Vec<ServerBindConfig>),
111 Tagged(Box<ServerBindConfig>, u32),
112 Null,
113}
114
115impl ServerBindConfig {
116 #[async_recursion]
117 pub async fn bind(self) -> BoundConnection {
118 match self {
119 ServerBindConfig::UnixSocket => {
120 #[cfg(unix)]
121 {
122 let dir = tempfile::tempdir().unwrap();
123 let socket_path = dir.path().join("socket");
124 let bound = UnixListener::bind(socket_path).unwrap();
125 BoundConnection::UnixSocket(
126 tokio::spawn(async move { Ok(bound.accept().await?.0) }),
127 dir,
128 )
129 }
130
131 #[cfg(not(unix))]
132 {
133 panic!("Unix sockets are not supported on this platform")
134 }
135 }
136 ServerBindConfig::TcpPort(host) => {
137 let listener = TcpListener::bind((host, 0)).await.unwrap();
138 let addr = listener.local_addr().unwrap();
139 BoundConnection::TcpPort(TcpListenerStream::new(listener), addr)
140 }
141 ServerBindConfig::Demux(bindings) => {
142 let mut demux = HashMap::new();
143 for (key, bind) in bindings {
144 demux.insert(key, bind.bind().await);
145 }
146 BoundConnection::Demux(demux)
147 }
148 ServerBindConfig::Merge(bindings) => {
149 let mut merge = Vec::new();
150 for bind in bindings {
151 merge.push(bind.bind().await);
152 }
153 BoundConnection::Merge(merge)
154 }
155 ServerBindConfig::Tagged(underlying, id) => {
156 BoundConnection::Tagged(Box::new(underlying.bind().await), id)
157 }
158 ServerBindConfig::Null => BoundConnection::Null,
159 }
160 }
161}
162
163#[derive(Debug)]
164pub enum ServerOrBound {
165 Server(RealizedServerPort),
166 Bound(BoundConnection),
167}
168
169impl ServerOrBound {
170 pub async fn connect<T: Connected>(self) -> T {
171 T::from_defn(self).await
172 }
173
174 pub fn connect_local_blocking<T: Connected>(self) -> T {
175 let handle = tokio::runtime::Handle::current();
176 let _guard = handle.enter();
177 futures::executor::block_on(T::from_defn(self))
178 }
179
180 pub async fn accept_tcp(&mut self) -> TcpStream {
181 if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self {
182 handle.next().await.unwrap().unwrap()
183 } else {
184 panic!("Not a TCP port")
185 }
186 }
187}
188
189pub type DynStream = Pin<Box<dyn Stream<Item = Result<BytesMut, io::Error>> + Send + Sync>>;
190
191pub type DynSink<Input> = Pin<Box<dyn Sink<Input, Error = io::Error> + Send + Sync>>;
192
193pub trait StreamSink:
194 Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>
195{
196}
197impl<T: Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error>> StreamSink
198 for T
199{
200}
201
202pub type DynStreamSink = Pin<Box<dyn StreamSink + Send + Sync>>;
203
204#[async_trait]
205pub trait Connected: Send {
206 async fn from_defn(pipe: ServerOrBound) -> Self;
207}
208
209pub trait ConnectedSink {
210 type Input: Send;
211 type Sink: Sink<Self::Input, Error = io::Error> + Send + Sync;
212
213 fn into_sink(self) -> Self::Sink;
214}
215
216pub trait ConnectedSource {
217 type Output: Send;
218 type Stream: Stream<Item = Result<Self::Output, io::Error>> + Send + Sync;
219 fn into_source(self) -> Self::Stream;
220}
221
222#[derive(Debug)]
223pub enum BoundConnection {
224 UnixSocket(JoinHandle<io::Result<UnixStream>>, tempfile::TempDir),
225 TcpPort(TcpListenerStream, SocketAddr),
226 Demux(HashMap<u32, BoundConnection>),
227 Merge(Vec<BoundConnection>),
228 Tagged(Box<BoundConnection>, u32),
229 Null,
230}
231
232impl BoundConnection {
233 pub fn sink_port(&self) -> ServerPort {
234 match self {
235 BoundConnection::UnixSocket(_, tempdir) => {
236 #[cfg(unix)]
237 {
238 ServerPort::UnixSocket(tempdir.path().join("socket"))
239 }
240
241 #[cfg(not(unix))]
242 {
243 let _ = tempdir;
244 panic!("Unix sockets are not supported on this platform")
245 }
246 }
247 BoundConnection::TcpPort(_, addr) => {
248 ServerPort::TcpPort(SocketAddr::new(addr.ip(), addr.port()))
249 }
250
251 BoundConnection::Demux(bindings) => {
252 let mut demux = HashMap::new();
253 for (key, bind) in bindings {
254 demux.insert(*key, bind.sink_port());
255 }
256 ServerPort::Demux(demux)
257 }
258
259 BoundConnection::Merge(bindings) => {
260 let mut merge = Vec::new();
261 for bind in bindings {
262 merge.push(bind.sink_port());
263 }
264 ServerPort::Merge(merge)
265 }
266
267 BoundConnection::Tagged(underlying, id) => {
268 ServerPort::Tagged(Box::new(underlying.sink_port()), *id)
269 }
270
271 BoundConnection::Null => ServerPort::Null,
272 }
273 }
274}
275
276#[async_recursion]
277async fn accept(bound: BoundConnection) -> ConnectedDirect {
278 match bound {
279 BoundConnection::UnixSocket(listener, _) => {
280 #[cfg(unix)]
281 {
282 let stream = listener.await.unwrap().unwrap();
283 ConnectedDirect {
284 stream_sink: Some(Box::pin(unix_bytes(stream))),
285 source_only: None,
286 sink_only: None,
287 }
288 }
289
290 #[cfg(not(unix))]
291 {
292 drop(listener);
293 panic!("Unix sockets are not supported on this platform")
294 }
295 }
296 BoundConnection::TcpPort(mut listener, _) => {
297 let stream = listener.next().await.unwrap().unwrap();
298 ConnectedDirect {
299 stream_sink: Some(Box::pin(tcp_bytes(stream))),
300 source_only: None,
301 sink_only: None,
302 }
303 }
304 BoundConnection::Merge(merge) => {
305 let mut sources = vec![];
306 for bound in merge {
307 sources.push(accept(bound).await.into_source());
308 }
309
310 let merge_source: DynStream = Box::pin(MergeSource {
311 marker: PhantomData,
312 sources,
313 });
314
315 ConnectedDirect {
316 stream_sink: None,
317 source_only: Some(merge_source),
318 sink_only: None,
319 }
320 }
321 BoundConnection::Demux(_) => panic!("Cannot connect to a demux pipe directly"),
322 BoundConnection::Tagged(_, _) => panic!("Cannot connect to a tagged pipe directly"),
323 BoundConnection::Null => {
324 ConnectedDirect::from_defn(ServerOrBound::Server(RealizedServerPort::Null)).await
325 }
326 }
327}
328
329fn tcp_bytes(stream: TcpStream) -> impl StreamSink {
330 Framed::new(stream, LengthDelimitedCodec::new())
331}
332
333#[cfg(unix)]
334fn unix_bytes(stream: UnixStream) -> impl StreamSink {
335 Framed::new(stream, LengthDelimitedCodec::new())
336}
337
338struct IoErrorDrain<T> {
339 marker: PhantomData<T>,
340}
341
342impl<T> Sink<T> for IoErrorDrain<T> {
343 type Error = io::Error;
344
345 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
346 Poll::Ready(Ok(()))
347 }
348
349 fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> {
350 Ok(())
351 }
352
353 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
354 Poll::Ready(Ok(()))
355 }
356
357 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
358 Poll::Ready(Ok(()))
359 }
360}
361
362async fn async_retry<T, E, F: Future<Output = Result<T, E>>>(
363 thunk: impl Fn() -> F,
364 count: usize,
365 delay: Duration,
366) -> Result<T, E> {
367 for _ in 1..count {
368 let result = thunk().await;
369 if result.is_ok() {
370 return result;
371 } else {
372 tokio::time::sleep(delay).await;
373 }
374 }
375
376 thunk().await
377}
378
379pub struct ConnectedDirect {
380 stream_sink: Option<DynStreamSink>,
381 source_only: Option<DynStream>,
382 sink_only: Option<DynSink<Bytes>>,
383}
384
385#[async_trait]
386impl Connected for ConnectedDirect {
387 async fn from_defn(pipe: ServerOrBound) -> Self {
388 match pipe {
389 ServerOrBound::Server(RealizedServerPort::UnixSocket(stream)) => {
390 #[cfg(unix)]
391 {
392 let stream = stream.await.unwrap().unwrap();
393 ConnectedDirect {
394 stream_sink: Some(Box::pin(unix_bytes(stream))),
395 source_only: None,
396 sink_only: None,
397 }
398 }
399
400 #[cfg(not(unix))]
401 {
402 drop(stream);
403 panic!("Unix sockets are not supported on this platform");
404 }
405 }
406 ServerOrBound::Server(RealizedServerPort::TcpPort(stream)) => {
407 let stream = stream.await.unwrap().unwrap();
408 stream.set_nodelay(true).unwrap();
409 ConnectedDirect {
410 stream_sink: Some(Box::pin(tcp_bytes(stream))),
411 source_only: None,
412 sink_only: None,
413 }
414 }
415 ServerOrBound::Server(RealizedServerPort::Merge(merge)) => {
416 let sources = futures::future::join_all(merge.into_iter().map(|port| async {
417 ConnectedDirect::from_defn(ServerOrBound::Server(port))
418 .await
419 .into_source()
420 }))
421 .await;
422
423 let merged = MergeSource {
424 marker: PhantomData,
425 sources,
426 };
427
428 ConnectedDirect {
429 stream_sink: None,
430 source_only: Some(Box::pin(merged)),
431 sink_only: None,
432 }
433 }
434 ServerOrBound::Server(RealizedServerPort::Demux(_)) => {
435 panic!("Cannot connect to a demux pipe directly")
436 }
437
438 ServerOrBound::Server(RealizedServerPort::Tagged(_, _)) => {
439 panic!("Cannot connect to a tagged pipe directly")
440 }
441
442 ServerOrBound::Server(RealizedServerPort::Null) => ConnectedDirect {
443 stream_sink: None,
444 source_only: Some(Box::pin(stream::empty())),
445 sink_only: Some(Box::pin(IoErrorDrain {
446 marker: PhantomData,
447 })),
448 },
449
450 ServerOrBound::Bound(bound) => accept(bound).await,
451 }
452 }
453}
454
455impl ConnectedSource for ConnectedDirect {
456 type Output = BytesMut;
457 type Stream = DynStream;
458
459 fn into_source(mut self) -> DynStream {
460 if let Some(s) = self.stream_sink.take() {
461 Box::pin(s)
462 } else {
463 self.source_only.take().unwrap()
464 }
465 }
466}
467
468impl ConnectedSink for ConnectedDirect {
469 type Input = Bytes;
470 type Sink = DynSink<Bytes>;
471
472 fn into_sink(mut self) -> DynSink<Self::Input> {
473 if let Some(s) = self.stream_sink.take() {
474 Box::pin(s)
475 } else {
476 self.sink_only.take().unwrap()
477 }
478 }
479}
480
481pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;
482
483pub struct ConnectedDemux<T: ConnectedSink>
484where
485 <T as ConnectedSink>::Input: Sync,
486{
487 pub keys: Vec<u32>,
488 sink: Option<BufferedDrain<T::Sink, T::Input>>,
489}
490
491#[pin_project]
492pub struct DemuxDrain<T, S: Sink<T, Error = io::Error> + Send + Sync + ?Sized> {
493 marker: PhantomData<T>,
494 #[pin]
495 sinks: HashMap<u32, Pin<Box<S>>>,
496}
497
498impl<T, S: Sink<T, Error = io::Error> + Send + Sync> Sink<(u32, T)> for DemuxDrain<T, S> {
499 type Error = io::Error;
500
501 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
502 for sink in self.project().sinks.values_mut() {
503 ready!(Sink::poll_ready(sink.as_mut(), _cx))?;
504 }
505
506 Poll::Ready(Ok(()))
507 }
508
509 fn start_send(self: Pin<&mut Self>, item: (u32, T)) -> Result<(), Self::Error> {
510 Sink::start_send(
511 self.project()
512 .sinks
513 .get_mut()
514 .get_mut(&item.0)
515 .unwrap_or_else(|| panic!("No sink in this demux for key {}", item.0))
516 .as_mut(),
517 item.1,
518 )
519 }
520
521 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
522 for sink in self.project().sinks.values_mut() {
523 ready!(Sink::poll_flush(sink.as_mut(), _cx))?;
524 }
525
526 Poll::Ready(Ok(()))
527 }
528
529 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
530 for sink in self.project().sinks.values_mut() {
531 ready!(Sink::poll_close(sink.as_mut(), _cx))?;
532 }
533
534 Poll::Ready(Ok(()))
535 }
536}
537
538#[async_trait]
539impl<T: Connected + ConnectedSink> Connected for ConnectedDemux<T>
540where
541 <T as ConnectedSink>::Input: 'static + Sync,
542{
543 async fn from_defn(pipe: ServerOrBound) -> Self {
544 match pipe {
545 ServerOrBound::Server(RealizedServerPort::Demux(demux)) => {
546 let mut connected_demux = HashMap::new();
547 let keys = demux.keys().cloned().collect();
548 for (id, pipe) in demux {
549 connected_demux.insert(
550 id,
551 Box::pin(
552 T::from_defn(ServerOrBound::Server(pipe))
553 .await
554 .into_sink()
555 .buffer(1024),
556 ),
557 );
558 }
559
560 let demuxer = DemuxDrain {
561 marker: PhantomData,
562 sinks: connected_demux,
563 };
564
565 ConnectedDemux {
566 keys,
567 sink: Some(demuxer),
568 }
569 }
570
571 ServerOrBound::Bound(BoundConnection::Demux(demux)) => {
572 let mut connected_demux = HashMap::new();
573 let keys = demux.keys().cloned().collect();
574 for (id, bound) in demux {
575 connected_demux.insert(
576 id,
577 Box::pin(
578 T::from_defn(ServerOrBound::Bound(bound))
579 .await
580 .into_sink()
581 .buffer(1024),
582 ),
583 );
584 }
585
586 let demuxer = DemuxDrain {
587 marker: PhantomData,
588 sinks: connected_demux,
589 };
590
591 ConnectedDemux {
592 keys,
593 sink: Some(demuxer),
594 }
595 }
596 _ => panic!("Cannot connect to a non-demux pipe as a demux"),
597 }
598 }
599}
600
601impl<T: ConnectedSink> ConnectedSink for ConnectedDemux<T>
602where
603 <T as ConnectedSink>::Input: 'static + Sync,
604{
605 type Input = (u32, T::Input);
606 type Sink = BufferedDrain<T::Sink, T::Input>;
607
608 fn into_sink(mut self) -> Self::Sink {
609 self.sink.take().unwrap()
610 }
611}
612
613pub struct MergeSource<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> {
614 marker: PhantomData<T>,
615 sources: Vec<Pin<Box<S>>>,
616}
617
618impl<T: Unpin, S: Stream<Item = T> + Send + Sync + ?Sized> Stream for MergeSource<T, S> {
619 type Item = T;
620
621 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
622 let sources = &mut self.get_mut().sources;
623 let mut next = None;
624
625 let mut i = 0;
626 while i < sources.len() {
627 match sources[i].as_mut().poll_next(cx) {
628 Poll::Ready(Some(v)) => {
629 next = Some(v);
630 break;
631 }
632 Poll::Ready(None) => {
633 sources.remove(i);
635 }
636 Poll::Pending => {
637 i += 1;
638 }
639 }
640 }
641
642 if sources.is_empty() {
643 Poll::Ready(None)
644 } else if next.is_none() {
645 Poll::Pending
646 } else {
647 Poll::Ready(next)
648 }
649 }
650}
651
652pub struct TaggedSource<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> {
653 marker: PhantomData<T>,
654 id: u32,
655 source: Pin<Box<S>>,
656}
657
658impl<T: Unpin, S: Stream<Item = Result<T, io::Error>> + Send + Sync + ?Sized> Stream
659 for TaggedSource<T, S>
660{
661 type Item = Result<(u32, T), io::Error>;
662
663 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
664 let id = self.as_ref().id;
665 let source = &mut self.get_mut().source;
666 match source.as_mut().poll_next(cx) {
667 Poll::Ready(Some(v)) => Poll::Ready(Some(v.map(|d| (id, d)))),
668 Poll::Ready(None) => Poll::Ready(None),
669 Poll::Pending => Poll::Pending,
670 }
671 }
672}
673
674type MergedMux<T> = MergeSource<
675 Result<(u32, <T as ConnectedSource>::Output), io::Error>,
676 TaggedSource<<T as ConnectedSource>::Output, <T as ConnectedSource>::Stream>,
677>;
678
679pub struct ConnectedTagged<T: ConnectedSource>
680where
681 <T as ConnectedSource>::Output: 'static + Sync + Unpin,
682{
683 source: MergedMux<T>,
684}
685
686#[async_trait]
687impl<T: Connected + ConnectedSource> Connected for ConnectedTagged<T>
688where
689 <T as ConnectedSource>::Output: 'static + Sync + Unpin,
690{
691 async fn from_defn(pipe: ServerOrBound) -> Self {
692 let sources = match pipe {
693 ServerOrBound::Server(RealizedServerPort::Tagged(pipe, id)) => {
694 vec![(
695 Box::pin(
696 T::from_defn(ServerOrBound::Server(*pipe))
697 .await
698 .into_source(),
699 ),
700 id,
701 )]
702 }
703
704 ServerOrBound::Server(RealizedServerPort::Merge(m)) => {
705 let mut sources = Vec::new();
706 for port in m {
707 if let RealizedServerPort::Tagged(pipe, id) = port {
708 sources.push((
709 Box::pin(
710 T::from_defn(ServerOrBound::Server(*pipe))
711 .await
712 .into_source(),
713 ),
714 id,
715 ));
716 } else {
717 panic!("Merge port must be tagged");
718 }
719 }
720
721 sources
722 }
723
724 ServerOrBound::Bound(BoundConnection::Tagged(pipe, id)) => {
725 vec![(
726 Box::pin(
727 T::from_defn(ServerOrBound::Bound(*pipe))
728 .await
729 .into_source(),
730 ),
731 id,
732 )]
733 }
734
735 ServerOrBound::Bound(BoundConnection::Merge(m)) => {
736 let mut sources = Vec::new();
737 for port in m {
738 if let BoundConnection::Tagged(pipe, id) = port {
739 sources.push((
740 Box::pin(
741 T::from_defn(ServerOrBound::Bound(*pipe))
742 .await
743 .into_source(),
744 ),
745 id,
746 ));
747 } else {
748 panic!("Merge port must be tagged");
749 }
750 }
751
752 sources
753 }
754
755 _ => panic!("Cannot connect to a non-tagged pipe as a tagged"),
756 };
757
758 let mut connected_mux = Vec::new();
759 for (pipe, id) in sources {
760 connected_mux.push(Box::pin(TaggedSource {
761 marker: PhantomData,
762 id,
763 source: pipe,
764 }));
765 }
766
767 let muxer = MergeSource {
768 marker: PhantomData,
769 sources: connected_mux,
770 };
771
772 ConnectedTagged { source: muxer }
773 }
774}
775
776impl<T: ConnectedSource> ConnectedSource for ConnectedTagged<T>
777where
778 <T as ConnectedSource>::Output: 'static + Sync + Unpin,
779{
780 type Output = (u32, T::Output);
781 type Stream = MergeSource<Result<Self::Output, io::Error>, TaggedSource<T::Output, T::Stream>>;
782
783 fn into_source(self) -> Self::Stream {
784 self.source
785 }
786}