1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4};
5
6use moq_transport::coding::TrackNamespace;
7use moq_transport::serve::{ServeError, Tracks, TracksReader, TracksWriter};
8
9use crate::{ListingReader, ListingWriter};
10
11struct State {
12 writer: TracksWriter,
13 active: HashMap<String, ListingWriter>,
14}
15
16#[derive(Clone)]
17pub struct Listings {
18 state: Arc<Mutex<State>>,
19 reader: TracksReader,
20}
21
22impl Listings {
23 pub fn new(namespace: String) -> Self {
24 let (writer, _, reader) = Tracks::new(TrackNamespace::from_utf8_path(&namespace)).produce();
25
26 let state = State {
27 writer,
28 active: HashMap::new(),
29 };
30
31 Self {
32 state: Arc::new(Mutex::new(state)),
33 reader,
34 }
35 }
36
37 pub fn register(&mut self, path: &str) -> Result<Option<Registration>, ServeError> {
39 let (prefix, base) = Self::prefix(path);
40
41 let namespace = self.reader.namespace.to_utf8_path();
42
43 if !prefix.starts_with(&namespace) {
44 return Ok(None);
46 }
47
48 let prefix = &prefix[namespace.len()..];
50
51 let mut state = self.state.lock().unwrap();
52 if let Some(listing) = state.active.get_mut(prefix) {
53 listing.insert(base.to_string())?;
54 } else {
55 log::info!("creating prefix: {}", prefix);
56 let track = state.writer.create(prefix).unwrap();
57
58 let mut listing = ListingWriter::new(track);
59 listing.insert(base.to_string())?;
60 state.active.insert(prefix.to_string(), listing);
61 }
62
63 log::info!("added listing: {} {}", prefix, base);
64
65 Ok(Some(Registration {
66 listing: self.clone(),
67 prefix: prefix.to_string(),
68 base: base.to_string(),
69 }))
70 }
71
72 fn remove(&mut self, prefix: &str, base: &str) -> Result<(), ServeError> {
73 let mut state = self.state.lock().unwrap();
74
75 let listing = state.active.get_mut(prefix).ok_or(ServeError::NotFound)?;
76 listing.remove(base)?;
77
78 log::info!("removed listing: {} {}", prefix, base);
79
80 if listing.is_empty() {
81 log::info!("removed prefix: {}", prefix);
82 state.active.remove(prefix);
83 state.writer.remove(prefix);
84 }
85
86 Ok(())
87 }
88
89 pub fn subscribe(&mut self, name: &str) -> Option<ListingReader> {
90 self.reader.subscribe(name).map(ListingReader::new)
91 }
92
93 pub fn tracks(&self) -> TracksReader {
94 self.reader.clone()
95 }
96
97 pub fn prefix(path: &str) -> (&str, &str) {
101 match path.rfind('.') {
103 Some(index) => (&path[..index + 1], &path[index + 1..]),
104 None => ("", path),
105 }
106 }
107}
108
109pub struct Registration {
111 listing: Listings,
112 prefix: String,
113 base: String,
114}
115
116impl Drop for Registration {
117 fn drop(&mut self) {
118 self.listing.remove(&self.prefix, &self.base).ok();
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125
126 #[test]
127 fn test_bucket() {
128 assert!(Listings::prefix(".") == (".", ""));
129 assert!(Listings::prefix(".foo") == (".", "foo"));
130 assert!(Listings::prefix(".foo.") == (".foo.", ""));
131 assert!(Listings::prefix(".foo.bar") == (".foo.", "bar"));
132 assert!(Listings::prefix(".foo.bar.") == (".foo.bar.", ""));
133 assert!(Listings::prefix(".foo.bar.baz") == (".foo.bar.", "baz"));
134 assert!(Listings::prefix(".foo.bar.baz.") == (".foo.bar.baz.", ""));
135
136 assert!(Listings::prefix("") == ("", ""));
137 assert!(Listings::prefix("foo") == ("", "foo"));
138 assert!(Listings::prefix("foo.") == ("foo.", ""));
139 assert!(Listings::prefix("foo.bar") == ("foo.", "bar"));
140 assert!(Listings::prefix("foo.bar.") == ("foo.bar.", ""));
141 assert!(Listings::prefix("foo.bar.baz") == ("foo.bar.", "baz"));
142 assert!(Listings::prefix("foo.bar.baz.") == ("foo.bar.baz.", ""));
143 }
144}