moq_dir/
session.rs

1use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
2use moq_transport::session::{Announced, Publisher, Subscriber};
3
4use crate::Listings;
5
6#[derive(Clone)]
7pub struct Session {
8    session: web_transport::Session,
9    listings: Listings,
10}
11
12impl Session {
13    pub fn new(session: web_transport::Session, listings: Listings) -> Self {
14        Self { session, listings }
15    }
16
17    pub async fn run(self) -> anyhow::Result<()> {
18        let session = self.session.clone();
19        let (session, publisher, subscriber) =
20            moq_transport::session::Session::accept(session).await?;
21
22        let mut tasks = FuturesUnordered::new();
23        tasks.push(async move { session.run().await.map_err(Into::into) }.boxed());
24
25        if let Some(remote) = publisher {
26            tasks.push(Self::serve_subscriber(self.clone(), remote).boxed());
27        }
28
29        if let Some(remote) = subscriber {
30            tasks.push(Self::serve_publisher(self.clone(), remote).boxed());
31        }
32
33        // Return the first error
34        tasks.select_next_some().await?;
35
36        Ok(())
37    }
38
39    async fn serve_subscriber(self, mut remote: Publisher) -> anyhow::Result<()> {
40        // Announce our namespace and serve any matching subscriptions AUTOMATICALLY
41        remote.announce(self.listings.tracks()).await?;
42
43        Ok(())
44    }
45
46    async fn serve_publisher(self, mut remote: Subscriber) -> anyhow::Result<()> {
47        let mut tasks = FuturesUnordered::new();
48
49        loop {
50            tokio::select! {
51                Some(announce) = remote.announced() => {
52                    let this = self.clone();
53
54                    tasks.push(async move {
55                        let info = announce.clone();
56                        log::info!("serving announce: {:?}", info);
57
58                        if let Err(err) = this.serve_announce(announce).await {
59                            log::warn!("failed serving announce: {:?}, error: {}", info, err)
60                        }
61                    });
62                },
63                _ = tasks.next(), if !tasks.is_empty() => {},
64                else => return Ok(()),
65            };
66        }
67    }
68
69    async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> {
70        announce.ok()?;
71
72        match self.listings.register(&announce.namespace.to_utf8_path()) {
73            Ok(_) => announce.closed().await?,
74            Err(err) => {
75                announce.close(err.clone())?;
76                return Err(err.into());
77            }
78        }
79
80        Ok(())
81    }
82}