use crate::define::*;

use crate::relay::SourceRelay;
use crate::source::Source;

use anyhow::{Result, anyhow, bail};
use derive_builder::Builder;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;

pub struct Mng {
    ctrl_tx: CtrlCaller,
}
impl Mng {
    pub fn ctrl(&self) -> &CtrlCaller {
        &self.ctrl_tx
    }
}
#[derive(Clone)]
struct Inner {
    sources: Arc<RwLock<HashMap<String, Arc<Source>>>>,
    relays: Arc<RwLock<HashMap<String, SourceRelay>>>,
    event_tx: SourceEventSender,
    config: SSConfig,
}
impl Inner {
    pub fn new(event_tx: SourceEventSender, config: SSConfig) -> Arc<Inner> {
        Arc::new(Self {
            event_tx,
            config,
            sources: Arc::new(RwLock::new(HashMap::new())),
            relays: Arc::new(RwLock::new(HashMap::new())),
        })
    }
}

impl Inner {
    fn get_relay(&self, relay_id: &str) -> Result<SourceRelay> {
        self.relays
            .read()
            .get(relay_id)
            .ok_or(SourceSSError::RelayNofound(relay_id.to_string()))
            .cloned()
            .map_err(Into::into)
    }

    fn get_source(&self, source_id: &str) -> Result<Arc<Source>> {
        self.sources
            .read()
            .get(source_id)
            .ok_or(SourceSSError::SourceNofound(source_id.to_string()))
            .cloned()
            .map_err(Into::into)
    }

    async fn subscribe(&self, relay_id: &str) -> Result<RelaySubject> {
        let relay = self.get_relay(relay_id)?;
        let source_id = relay.source_id().to_string();
        let source = self.get_source(&source_id)?;

        let have_relay = source.have_relay();
        let scheme_key = relay.scheme_key();
        let relay_rx = Some(source.subscribe(&scheme_key).await?);
        Ok(RelaySubject {
            have_relay,
            relay_rx,
            relay: relay.clone(),
        })
    }

    fn subscribe_input(&self, sub_id: SubId) -> Result<IndexPacketSender> {
        let source = match sub_id {
            SubId::SourceId(source_id) => self.get_source(&source_id)?,
            SubId::RelayId(relay_id) => {
                let relay = self.get_relay(&relay_id)?;
                let source_id = relay.source_id().to_string();
                self.get_source(&source_id)?
            },
        };

        let input_data = source.subscribe_input()?;
        Ok(input_data)
    }

    async fn add_source(&self, source_relay: &SourceRelay) -> Result<()> {
        let source_id = source_relay.source_id().to_string();

        if self.sources.read().contains_key(&source_id) {
            let source = self
                .sources
                .read()
                .get(&source_id)
                .ok_or_else(|| anyhow::anyhow!("source no found"))?
                .clone();
            source.add_scheme(source_relay).await?;
            match self.relays.write().entry(source_relay.relay_id().to_owned()) {
                Entry::Occupied(_) => Err(anyhow!("relay {} already exists", source_relay.relay_id())),
                Entry::Vacant(entry) => {
                    entry.insert(source_relay.to_owned());
                    Ok(())
                },
            }
        } else {
            match source_relay.scheme() {
                SSScheme::MpegTs
                | SSScheme::Flv
                | SSScheme::Hls
                | SSScheme::Fmp4
                | SSScheme::Es
                | SSScheme::Rtp
                | SSScheme::EnhanceFlv
                | SSScheme::Ps
                | SSScheme::Data
                | SSScheme::Blob
                | SSScheme::G711a
                | SSScheme::Mp2p => {
                    let source = Source::new(self.event_tx.clone(), source_relay.source_id(), &self.config).await?;
                    source.add_scheme(source_relay).await?;
                    self.sources.write().insert(source_id.clone(), Arc::new(source));

                    match self.relays.write().entry(source_relay.relay_id().to_owned()) {
                        Entry::Occupied(_) => {
                            // this will never happen
                            Err(anyhow!("relay {} have already exists ", source_relay.relay_id()))
                        },
                        Entry::Vacant(entry) => {
                            entry.insert(source_relay.to_owned());
                            Ok(())
                        },
                    }
                },
                SSScheme::Unkown => bail!("add source fail , error: Unkown scheme "),
            }
        }
    }
    async fn fetch_m3u8(&self, args: FetchM3u8Args) -> Result<RelaySubject> {
        let relay_id = args.relay_id().to_string();
        let resource_name = args.resource_name().to_string();
        let res = args.into_res();
        let relay = self.get_relay(&relay_id)?;
        let source_id = relay.source_id().to_string();
        let source = self.get_source(&source_id)?;

        let have_relay = source.have_relay();
        let scheme_key = relay.scheme_key();
        source.fetch(&scheme_key, res, &resource_name).await?;

        Ok(RelaySubject {
            relay_rx: None,
            relay: relay.to_owned(),
            have_relay,
        })
    }

    fn remove_source(&self, source_id: &str) -> Result<()> {
        let expire_source = self
            .sources
            .write()
            .remove(source_id)
            .ok_or_else(|| anyhow!("remove source fail, error: source_id:{} no found", source_id))?;
        drop(expire_source);

        self.relays.write().retain(|_, relay| relay.source_id() != source_id);
        Ok(())
    }
}

#[derive(Builder)]
pub struct SourceSSRunWithArgs {
    config: SSConfig,
    event_tx: SourceEventSender,
}

pub fn run_with(args: SourceSSRunWithArgs) -> Result<Mng> {
    let event_tx = args.event_tx;
    let config = args.config;

    let (ctrl_tx, mut ctrl_rx) = Ctrl::actor();

    let inner = Inner::new(event_tx, config);
    tokio::spawn(async move {
        while let Some(c) = ctrl_rx.recv().await {
            let inner = inner.clone();
            tokio::spawn(async move {
                match c {
                    Ctrl::AddRelaySource(actor_pkt) => {
                        let result = inner.add_source(actor_pkt.args()).await;
                        let _ = actor_pkt.return_(result);
                    },
                    Ctrl::Subscribe(actor_pkt) => {
                        let result = inner.subscribe(actor_pkt.args()).await;
                        let _ = actor_pkt.return_(result);
                    },
                    Ctrl::SubscribeInput(actor_pkt) => {
                        let (args, ret) = actor_pkt.split();

                        let _ = ret.return_(inner.subscribe_input(args));
                    },
                    Ctrl::RemoveRelaySource(actor_pkt) => {
                        let result = inner.remove_source(actor_pkt.args());
                        let _ = actor_pkt.return_(result);
                    },
                    Ctrl::FetchM3u8(actor_pkt) => {
                        let (args, ret) = actor_pkt.split();
                        let result = inner.fetch_m3u8(args).await;
                        let _ = ret.return_(result);
                    },
                }
            });
        }
    });

    Ok(Mng { ctrl_tx })
}
pub fn run(event_tx: SourceEventSender, config: &SSConfig) -> Result<Mng> {
    let args = SourceSSRunWithArgsBuilder::default()
        .event_tx(event_tx)
        .config(config.clone())
        .build()?;
    run_with(args)
}