moq_dir/
listings.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex},
4};
5
6use moq_transport::coding::TrackNamespace;
7use moq_transport::serve::{ServeError, Tracks, TracksReader, TracksWriter};
8
9use crate::{ListingReader, ListingWriter};
10
11struct State {
12    writer: TracksWriter,
13    active: HashMap<String, ListingWriter>,
14}
15
16#[derive(Clone)]
17pub struct Listings {
18    state: Arc<Mutex<State>>,
19    reader: TracksReader,
20}
21
22impl Listings {
23    pub fn new(namespace: String) -> Self {
24        let (writer, _, reader) = Tracks::new(TrackNamespace::from_utf8_path(&namespace)).produce();
25
26        let state = State {
27            writer,
28            active: HashMap::new(),
29        };
30
31        Self {
32            state: Arc::new(Mutex::new(state)),
33            reader,
34        }
35    }
36
37    // Returns a Registration that removes on drop.
38    pub fn register(&mut self, path: &str) -> Result<Option<Registration>, ServeError> {
39        let (prefix, base) = Self::prefix(path);
40
41        let namespace = self.reader.namespace.to_utf8_path();
42
43        if !prefix.starts_with(&namespace) {
44            // Ignore anything that isn't in our namespace.
45            return Ok(None);
46        }
47
48        // Remove the namespace prefix from the path.
49        let prefix = &prefix[namespace.len()..];
50
51        let mut state = self.state.lock().unwrap();
52        if let Some(listing) = state.active.get_mut(prefix) {
53            listing.insert(base.to_string())?;
54        } else {
55            log::info!("creating prefix: {}", prefix);
56            let track = state.writer.create(prefix).unwrap();
57
58            let mut listing = ListingWriter::new(track);
59            listing.insert(base.to_string())?;
60            state.active.insert(prefix.to_string(), listing);
61        }
62
63        log::info!("added listing: {} {}", prefix, base);
64
65        Ok(Some(Registration {
66            listing: self.clone(),
67            prefix: prefix.to_string(),
68            base: base.to_string(),
69        }))
70    }
71
72    fn remove(&mut self, prefix: &str, base: &str) -> Result<(), ServeError> {
73        let mut state = self.state.lock().unwrap();
74
75        let listing = state.active.get_mut(prefix).ok_or(ServeError::NotFound)?;
76        listing.remove(base)?;
77
78        log::info!("removed listing: {} {}", prefix, base);
79
80        if listing.is_empty() {
81            log::info!("removed prefix: {}", prefix);
82            state.active.remove(prefix);
83            state.writer.remove(prefix);
84        }
85
86        Ok(())
87    }
88
89    pub fn subscribe(&mut self, name: &str) -> Option<ListingReader> {
90        self.reader.subscribe(name).map(ListingReader::new)
91    }
92
93    pub fn tracks(&self) -> TracksReader {
94        self.reader.clone()
95    }
96
97    // Returns the prefix for the string.
98    // This is just the content before the last '/', like a directory name.
99    // ex. "/foo/bar/baz" -> ("/foo/bar", "baz")
100    pub fn prefix(path: &str) -> (&str, &str) {
101        // Find the last '/' and return the parts.
102        match path.rfind('.') {
103            Some(index) => (&path[..index + 1], &path[index + 1..]),
104            None => ("", path),
105        }
106    }
107}
108
109// Used to remove the registration on drop.
110pub struct Registration {
111    listing: Listings,
112    prefix: String,
113    base: String,
114}
115
116impl Drop for Registration {
117    fn drop(&mut self) {
118        self.listing.remove(&self.prefix, &self.base).ok();
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125
126    #[test]
127    fn test_bucket() {
128        assert!(Listings::prefix(".") == (".", ""));
129        assert!(Listings::prefix(".foo") == (".", "foo"));
130        assert!(Listings::prefix(".foo.") == (".foo.", ""));
131        assert!(Listings::prefix(".foo.bar") == (".foo.", "bar"));
132        assert!(Listings::prefix(".foo.bar.") == (".foo.bar.", ""));
133        assert!(Listings::prefix(".foo.bar.baz") == (".foo.bar.", "baz"));
134        assert!(Listings::prefix(".foo.bar.baz.") == (".foo.bar.baz.", ""));
135
136        assert!(Listings::prefix("") == ("", ""));
137        assert!(Listings::prefix("foo") == ("", "foo"));
138        assert!(Listings::prefix("foo.") == ("foo.", ""));
139        assert!(Listings::prefix("foo.bar") == ("foo.", "bar"));
140        assert!(Listings::prefix("foo.bar.") == ("foo.bar.", ""));
141        assert!(Listings::prefix("foo.bar.baz") == ("foo.bar.", "baz"));
142        assert!(Listings::prefix("foo.bar.baz.") == ("foo.bar.baz.", ""));
143    }
144}