use std::{
  collections::HashMap,
  sync::{Arc, Mutex},
};
#[cfg(feature = "security")]
use std::path::{PathBuf, Path};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use mio::Evented;
use serde::{de::DeserializeOwned, Serialize};
use rustdds::{
  dds::CreateResult,
  no_key::{DeserializerAdapter, SerializerAdapter},
  policy::*,
  *,
};
use crate::{
  builtin_topics,
  gid::Gid,
  node::{Node, NodeOptions},
  node_entities_info::NodeEntitiesInfo,
  participant_entities_info::ParticipantEntitiesInfo,
  pubsub::{Publisher, Subscription},
};
lazy_static! {
  pub static ref DEFAULT_QOS: QosPolicies = QosPolicyBuilder::new()
    .durability(Durability::Volatile)
    .deadline(Deadline(Duration::DURATION_INFINITE))
    .ownership(Ownership::Shared)
    .reliability(Reliability::BestEffort)
    .history(History::KeepLast { depth: 1 })
    .lifespan(Lifespan {
      duration: Duration::DURATION_INFINITE
    })
    .build();
}
#[cfg(feature = "security")]
struct SecurityConfig {
  security_config_dir: PathBuf,
  private_key_password: String,
}
pub struct ContextOptions {
  domain_id: u16,
  #[cfg(feature = "security")]
  security_config: Option<SecurityConfig>,
}
impl ContextOptions {
  pub fn new() -> Self {
    Self {
      domain_id: 0,
      #[cfg(feature = "security")]
      security_config: None,
    }
  }
  pub fn domain_id(mut self, domain_id: u16) -> Self {
    self.domain_id = domain_id;
    self
  }
  #[cfg(feature = "security")]
  pub fn enable_security(mut self, security_config_dir: impl AsRef<Path>, 
    private_key_password: String) -> Self 
  {
    self.security_config = 
      Some(SecurityConfig{ 
        security_config_dir: security_config_dir.as_ref().to_path_buf() , 
        private_key_password });
    self
  }
}
impl Default for ContextOptions {
  fn default() -> Self {
    Self::new()
  }
}
#[derive(Clone)]
pub struct Context {
  inner: Arc<Mutex<ContextInner>>,
}
impl Context {
  pub fn new() -> CreateResult<Context> {
    Self::from_domain_participant(DomainParticipant::new(0)?)
  }
  pub fn with_options(opt: ContextOptions) -> CreateResult<Context> {
    #[allow(unused_mut)] let mut dpb =  DomainParticipantBuilder::new( opt.domain_id );
    #[cfg(feature = "security")]
    {
      if let Some(sc) = opt.security_config {
        dpb = 
          dpb.builtin_security( 
            DomainParticipantSecurityConfigFiles::with_ros_default_names(
              sc.security_config_dir,
              sc.private_key_password,
            ) 
          );
      }
    }
    Self::from_domain_participant(dpb.build()?)
  }   
  pub fn from_domain_participant(domain_participant: DomainParticipant) -> CreateResult<Context> {
    let i = ContextInner::from_domain_participant(domain_participant)?;
    Ok(Context {
      inner: Arc::new(Mutex::new(i)),
    })
  }
  pub fn new_node(&self, name: &str, namespace: &str, options: NodeOptions) -> CreateResult<Node> {
    Node::new(name, namespace, options, self.clone())
  }
  pub fn create_publisher<M>(
    &self,
    topic: &Topic,
    qos: Option<QosPolicies>,
  ) -> dds::CreateResult<Publisher<M>>
  where
    M: Serialize,
  {
    let datawriter = self
      .get_ros_default_publisher()
      .create_datawriter_no_key(topic, qos)?;
    Ok(Publisher::new(datawriter))
  }
  pub fn create_subscription<M>(
    &self,
    topic: &Topic,
    qos: Option<QosPolicies>,
  ) -> dds::CreateResult<Subscription<M>>
  where
    M: 'static + DeserializeOwned,
  {
    let datareader = self
      .get_ros_default_subscriber()
      .create_simple_datareader_no_key(topic, qos)?;
    Ok(Subscription::new(datareader))
  }
  pub(crate) fn create_datawriter<M, SA>(
    &self,
    topic: &Topic,
    qos: Option<QosPolicies>,
  ) -> dds::CreateResult<no_key::DataWriter<M, SA>>
  where
    SA: SerializerAdapter<M>,
  {
    self
      .get_ros_default_publisher()
      .create_datawriter_no_key(topic, qos)
  }
  pub fn create_simpledatareader<M, DA>(
    &self,
    topic: &Topic,
    qos: Option<QosPolicies>,
  ) -> dds::CreateResult<no_key::SimpleDataReader<M, DA>>
  where
    M: 'static,
    DA: 'static + DeserializerAdapter<M>,
  {
    self
      .get_ros_default_subscriber()
      .create_simple_datareader_no_key(topic, qos)
  }
  pub fn handle_node_read(&mut self) -> Vec<ParticipantEntitiesInfo> {
    self.inner.lock().unwrap().handle_node_read()
  }
  pub fn clear(&mut self) {
    self.inner.lock().unwrap().clear();
  }
  pub fn domain_id(&self) -> u16 {
    self.inner.lock().unwrap().domain_participant.domain_id()
  }
  pub fn discovered_topics(&self) -> Vec<rustdds::discovery::DiscoveredTopicData> {
    self.domain_participant().discovered_topics()
  }
  pub fn add_node_info(&mut self, node_info: NodeEntitiesInfo) {
    self.inner.lock().unwrap().add_node_info(node_info);
  }
  pub fn remove_node_info(&mut self, node_info: &NodeEntitiesInfo) {
    self.inner.lock().unwrap().remove_node_info(node_info);
  }
  pub fn get_all_discovered_external_ros_node_infos(&self) -> HashMap<Gid, Vec<NodeEntitiesInfo>> {
    self.inner.lock().unwrap().external_nodes.clone()
  }
  pub fn get_all_discovered_local_ros_node_infos(&self) -> HashMap<String, NodeEntitiesInfo> {
    self.inner.lock().unwrap().nodes.clone()
  }
  pub fn get_ros_participant_info(&self) -> ParticipantEntitiesInfo {
    self.inner.lock().unwrap().get_ros_participant_info()
  }
  pub fn get_parameter_events_topic(&self) -> Topic {
    self
      .inner
      .lock()
      .unwrap()
      .ros_parameter_events_topic
      .clone()
  }
  pub fn get_rosout_topic(&self) -> Topic {
    self.inner.lock().unwrap().ros_rosout_topic.clone()
  }
  pub fn domain_participant(&self) -> DomainParticipant {
    self.inner.lock().unwrap().domain_participant.clone()
  }
  fn get_ros_default_publisher(&self) -> rustdds::Publisher {
    self.inner.lock().unwrap().ros_default_publisher.clone()
  }
  fn get_ros_default_subscriber(&self) -> rustdds::Subscriber {
    self.inner.lock().unwrap().ros_default_subscriber.clone()
  }
}
struct ContextInner {
  nodes: HashMap<String, NodeEntitiesInfo>,
  external_nodes: HashMap<Gid, Vec<NodeEntitiesInfo>>,
  node_reader: no_key::DataReaderCdr<ParticipantEntitiesInfo>,
  node_writer: no_key::DataWriterCdr<ParticipantEntitiesInfo>,
  domain_participant: DomainParticipant,
  #[allow(dead_code)]
  ros_discovery_topic: Topic,
  ros_default_publisher: rustdds::Publisher,
  ros_default_subscriber: rustdds::Subscriber,
  ros_parameter_events_topic: Topic,
  ros_rosout_topic: Topic,
}
impl ContextInner {
  pub fn from_domain_participant(
    domain_participant: DomainParticipant,
  ) -> CreateResult<ContextInner> {
    let ros_discovery_topic = domain_participant.create_topic(
      builtin_topics::ros_discovery::TOPIC_NAME.to_string(),
      builtin_topics::ros_discovery::TYPE_NAME.to_string(),
      &builtin_topics::ros_discovery::QOS_PUB,
      TopicKind::NoKey,
    )?;
    let ros_default_publisher = domain_participant.create_publisher(&DEFAULT_QOS)?;
    let ros_default_subscriber = domain_participant.create_subscriber(&DEFAULT_QOS)?;
    let ros_parameter_events_topic = domain_participant.create_topic(
      builtin_topics::parameter_events::TOPIC_NAME.to_string(),
      builtin_topics::parameter_events::TYPE_NAME.to_string(),
      &builtin_topics::parameter_events::QOS,
      TopicKind::NoKey,
    )?;
    let ros_rosout_topic = domain_participant.create_topic(
      builtin_topics::rosout::TOPIC_NAME.to_string(),
      builtin_topics::rosout::TYPE_NAME.to_string(),
      &builtin_topics::rosout::QOS,
      TopicKind::NoKey,
    )?;
    let node_reader =
      ros_default_subscriber.create_datareader_no_key(&ros_discovery_topic, None)?;
    let node_writer = ros_default_publisher.create_datawriter_no_key(&ros_discovery_topic, None)?;
    Ok(ContextInner {
      nodes: HashMap::new(),
      external_nodes: HashMap::new(),
      node_reader,
      node_writer,
      domain_participant,
      ros_discovery_topic,
      ros_default_publisher,
      ros_default_subscriber,
      ros_parameter_events_topic,
      ros_rosout_topic,
    })
  }
  pub fn get_ros_participant_info(&self) -> ParticipantEntitiesInfo {
    ParticipantEntitiesInfo::new(
      Gid::from_guid(self.domain_participant.guid()),
      self.nodes.values().cloned().collect(),
    )
  }
  fn add_node_info(&mut self, mut node_info: NodeEntitiesInfo) {
    node_info.add_reader(Gid::from_guid(self.node_reader.guid()));
    node_info.add_writer(Gid::from_guid(self.node_writer.guid()));
    self.nodes.insert(node_info.get_full_name(), node_info);
    self.broadcast_node_infos();
  }
  fn remove_node_info(&mut self, node_info: &NodeEntitiesInfo) {
    self.nodes.remove(&node_info.get_full_name());
    self.broadcast_node_infos();
  }
  pub fn clear(&mut self) {
    if !self.nodes.is_empty() {
      self.nodes.clear();
      self.broadcast_node_infos();
    }
  }
  fn broadcast_node_infos(&self) {
    match self
      .node_writer
      .write(self.get_ros_participant_info(), None)
    {
      Ok(_) => (),
      Err(e) => error!("Failed to write into node_writer {:?}", e),
    }
  }
  pub fn handle_node_read(&mut self) -> Vec<ParticipantEntitiesInfo> {
    let mut pts = Vec::new();
    while let Ok(Some(sample)) = self.node_reader.take_next_sample() {
      let rpi = sample.value();
      match self.external_nodes.get_mut(&rpi.guid()) {
        Some(rpi2) => {
          *rpi2 = rpi.nodes().clone();
        }
        None => {
          self.external_nodes.insert(rpi.guid(), rpi.nodes().clone());
        }
      };
      pts.push(rpi.clone());
    }
    pts
  }
  }
impl Evented for Context {
  fn register(
    &self,
    poll: &mio::Poll,
    token: mio::Token,
    interest: mio::Ready,
    opts: mio::PollOpt,
  ) -> std::io::Result<()> {
    poll.register(
      &self.inner.lock().unwrap().node_reader,
      token,
      interest,
      opts,
    )
  }
  fn reregister(
    &self,
    poll: &mio::Poll,
    token: mio::Token,
    interest: mio::Ready,
    opts: mio::PollOpt,
  ) -> std::io::Result<()> {
    poll.reregister(
      &self.inner.lock().unwrap().node_reader,
      token,
      interest,
      opts,
    )
  }
  fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
    poll.deregister(&self.inner.lock().unwrap().node_reader)
  }
}