1use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
2use moq_transport::session::{Announced, Publisher, Subscriber};
3
4use crate::Listings;
5
6#[derive(Clone)]
7pub struct Session {
8 session: web_transport::Session,
9 listings: Listings,
10}
11
12impl Session {
13 pub fn new(session: web_transport::Session, listings: Listings) -> Self {
14 Self { session, listings }
15 }
16
17 pub async fn run(self) -> anyhow::Result<()> {
18 let session = self.session.clone();
19 let (session, publisher, subscriber) =
20 moq_transport::session::Session::accept(session).await?;
21
22 let mut tasks = FuturesUnordered::new();
23 tasks.push(async move { session.run().await.map_err(Into::into) }.boxed());
24
25 if let Some(remote) = publisher {
26 tasks.push(Self::serve_subscriber(self.clone(), remote).boxed());
27 }
28
29 if let Some(remote) = subscriber {
30 tasks.push(Self::serve_publisher(self.clone(), remote).boxed());
31 }
32
33 tasks.select_next_some().await?;
35
36 Ok(())
37 }
38
39 async fn serve_subscriber(self, mut remote: Publisher) -> anyhow::Result<()> {
40 remote.announce(self.listings.tracks()).await?;
42
43 Ok(())
44 }
45
46 async fn serve_publisher(self, mut remote: Subscriber) -> anyhow::Result<()> {
47 let mut tasks = FuturesUnordered::new();
48
49 loop {
50 tokio::select! {
51 Some(announce) = remote.announced() => {
52 let this = self.clone();
53
54 tasks.push(async move {
55 let info = announce.clone();
56 log::info!("serving announce: {:?}", info);
57
58 if let Err(err) = this.serve_announce(announce).await {
59 log::warn!("failed serving announce: {:?}, error: {}", info, err)
60 }
61 });
62 },
63 _ = tasks.next(), if !tasks.is_empty() => {},
64 else => return Ok(()),
65 };
66 }
67 }
68
69 async fn serve_announce(mut self, mut announce: Announced) -> anyhow::Result<()> {
70 announce.ok()?;
71
72 match self.listings.register(&announce.namespace.to_utf8_path()) {
73 Ok(_) => announce.closed().await?,
74 Err(err) => {
75 announce.close(err.clone())?;
76 return Err(err.into());
77 }
78 }
79
80 Ok(())
81 }
82}