use crate::define::*;
use crate::relay::SourceRelay;
use crate::source::Source;
use anyhow::{Result, anyhow, bail};
use derive_builder::Builder;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
pub struct Mng {
ctrl_tx: CtrlCaller,
}
impl Mng {
pub fn ctrl(&self) -> &CtrlCaller {
&self.ctrl_tx
}
}
#[derive(Clone)]
struct Inner {
sources: Arc<RwLock<HashMap<String, Arc<Source>>>>,
relays: Arc<RwLock<HashMap<String, SourceRelay>>>,
event_tx: SourceEventSender,
config: SSConfig,
}
impl Inner {
pub fn new(event_tx: SourceEventSender, config: SSConfig) -> Arc<Inner> {
Arc::new(Self {
event_tx,
config,
sources: Arc::new(RwLock::new(HashMap::new())),
relays: Arc::new(RwLock::new(HashMap::new())),
})
}
}
impl Inner {
fn get_relay(&self, relay_id: &str) -> Result<SourceRelay> {
self.relays
.read()
.get(relay_id)
.ok_or(SourceSSError::RelayNofound(relay_id.to_string()))
.cloned()
.map_err(Into::into)
}
fn get_source(&self, source_id: &str) -> Result<Arc<Source>> {
self.sources
.read()
.get(source_id)
.ok_or(SourceSSError::SourceNofound(source_id.to_string()))
.cloned()
.map_err(Into::into)
}
async fn subscribe(&self, relay_id: &str) -> Result<RelaySubject> {
let relay = self.get_relay(relay_id)?;
let source_id = relay.source_id().to_string();
let source = self.get_source(&source_id)?;
let have_relay = source.have_relay();
let scheme_key = relay.scheme_key();
let relay_rx = Some(source.subscribe(&scheme_key).await?);
Ok(RelaySubject {
have_relay,
relay_rx,
relay: relay.clone(),
})
}
fn subscribe_input(&self, sub_id: SubId) -> Result<IndexPacketSender> {
let source = match sub_id {
SubId::SourceId(source_id) => self.get_source(&source_id)?,
SubId::RelayId(relay_id) => {
let relay = self.get_relay(&relay_id)?;
let source_id = relay.source_id().to_string();
self.get_source(&source_id)?
},
};
let input_data = source.subscribe_input()?;
Ok(input_data)
}
async fn add_source(&self, source_relay: &SourceRelay) -> Result<()> {
let source_id = source_relay.source_id().to_string();
if self.sources.read().contains_key(&source_id) {
let source = self
.sources
.read()
.get(&source_id)
.ok_or_else(|| anyhow::anyhow!("source no found"))?
.clone();
source.add_scheme(source_relay).await?;
match self.relays.write().entry(source_relay.relay_id().to_owned()) {
Entry::Occupied(_) => Err(anyhow!("relay {} already exists", source_relay.relay_id())),
Entry::Vacant(entry) => {
entry.insert(source_relay.to_owned());
Ok(())
},
}
} else {
match source_relay.scheme() {
SSScheme::MpegTs
| SSScheme::Flv
| SSScheme::Hls
| SSScheme::Fmp4
| SSScheme::Es
| SSScheme::Rtp
| SSScheme::EnhanceFlv
| SSScheme::Ps
| SSScheme::Data
| SSScheme::Blob
| SSScheme::G711a
| SSScheme::Mp2p => {
let source = Source::new(self.event_tx.clone(), source_relay.source_id(), &self.config).await?;
source.add_scheme(source_relay).await?;
self.sources.write().insert(source_id.clone(), Arc::new(source));
match self.relays.write().entry(source_relay.relay_id().to_owned()) {
Entry::Occupied(_) => {
Err(anyhow!("relay {} have already exists ", source_relay.relay_id()))
},
Entry::Vacant(entry) => {
entry.insert(source_relay.to_owned());
Ok(())
},
}
},
SSScheme::Unkown => bail!("add source fail , error: Unkown scheme "),
}
}
}
async fn fetch_m3u8(&self, args: FetchM3u8Args) -> Result<RelaySubject> {
let relay_id = args.relay_id().to_string();
let resource_name = args.resource_name().to_string();
let res = args.into_res();
let relay = self.get_relay(&relay_id)?;
let source_id = relay.source_id().to_string();
let source = self.get_source(&source_id)?;
let have_relay = source.have_relay();
let scheme_key = relay.scheme_key();
source.fetch(&scheme_key, res, &resource_name).await?;
Ok(RelaySubject {
relay_rx: None,
relay: relay.to_owned(),
have_relay,
})
}
fn remove_source(&self, source_id: &str) -> Result<()> {
let expire_source = self
.sources
.write()
.remove(source_id)
.ok_or_else(|| anyhow!("remove source fail, error: source_id:{} no found", source_id))?;
drop(expire_source);
self.relays.write().retain(|_, relay| relay.source_id() != source_id);
Ok(())
}
}
#[derive(Builder)]
pub struct SourceSSRunWithArgs {
config: SSConfig,
event_tx: SourceEventSender,
}
pub fn run_with(args: SourceSSRunWithArgs) -> Result<Mng> {
let event_tx = args.event_tx;
let config = args.config;
let (ctrl_tx, mut ctrl_rx) = Ctrl::actor();
let inner = Inner::new(event_tx, config);
tokio::spawn(async move {
while let Some(c) = ctrl_rx.recv().await {
let inner = inner.clone();
tokio::spawn(async move {
match c {
Ctrl::AddRelaySource(actor_pkt) => {
let result = inner.add_source(actor_pkt.args()).await;
let _ = actor_pkt.return_(result);
},
Ctrl::Subscribe(actor_pkt) => {
let result = inner.subscribe(actor_pkt.args()).await;
let _ = actor_pkt.return_(result);
},
Ctrl::SubscribeInput(actor_pkt) => {
let (args, ret) = actor_pkt.split();
let _ = ret.return_(inner.subscribe_input(args));
},
Ctrl::RemoveRelaySource(actor_pkt) => {
let result = inner.remove_source(actor_pkt.args());
let _ = actor_pkt.return_(result);
},
Ctrl::FetchM3u8(actor_pkt) => {
let (args, ret) = actor_pkt.split();
let result = inner.fetch_m3u8(args).await;
let _ = ret.return_(result);
},
}
});
}
});
Ok(Mng { ctrl_tx })
}
pub fn run(event_tx: SourceEventSender, config: &SSConfig) -> Result<Mng> {
let args = SourceSSRunWithArgsBuilder::default()
.event_tx(event_tx)
.config(config.clone())
.build()?;
run_with(args)
}