moq_dir/
session.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_transport::session::{Announced, Publisher, Subscriber};

use crate::Listings;

#[derive(Clone)]
pub struct Session {
    session: web_transport::Session,
    listings: Listings,
}

impl Session {
    pub fn new(session: web_transport::Session, listings: Listings) -> Self {
        Self { session, listings }
    }

    pub async fn run(self) -> anyhow::Result<()> {
        let session = self.session.clone();
        let (session, publisher, subscriber) =
            moq_transport::session::Session::accept(session).await?;

        let mut tasks = FuturesUnordered::new();
        tasks.push(async move { session.run().await.map_err(Into::into) }.boxed());

        if let Some(remote) = publisher {
            tasks.push(Self::serve_subscriber(self.clone(), remote).boxed());
        }

        if let Some(remote) = subscriber {
            tasks.push(Self::serve_publisher(self.clone(), remote).boxed());
        }

        // Return the first error
        tasks.select_next_some().await?;

        Ok(())
    }

    async fn serve_subscriber(self, mut remote: Publisher) -> anyhow::Result<()> {
        // Announce our namespace and serve any matching subscriptions AUTOMATICALLY
        remote.announce(self.listings.tracks()).await?;

        Ok(())
    }

    async fn serve_publisher(self, mut remote: Subscriber) -> anyhow::Result<()> {
        let mut tasks = FuturesUnordered::new();

        loop {
            tokio::select! {
                Some(announce) = remote.announced() => {
                    let this = self.clone();

                    tasks.push(async move {
                        let info = announce.clone();
                        log::info!("serving announce: {:?}", info);

                        if let Err(err) = this.serve_announce(announce).await {
                            log::warn!("failed serving announce: {:?}, error: {}", info, err)
                        }
                    });
                },
                _ = tasks.next(), if !tasks.is_empty() => {},
                else => return Ok(()),
            };
        }
    }

    async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> {
        announce.ok()?;

        match self.listings.register(&announce.namespace.to_utf8_path()) {
            Ok(_) => announce.closed().await?,
            Err(err) => {
                announce.close(err.clone())?;
                return Err(err.into());
            }
        }

        Ok(())
    }
}