#![allow(clippy::mutable_key_type)] 
use crate::event::NetworkEvent;
use libp2p::{
    identity::PeerId,
    kad::{
        store::{Error, RecordStore, Result},
        KBucketDistance as Distance, KBucketKey, ProviderRecord, Record, RecordKey as Key,
    },
};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::gauge::Gauge;
use sn_protocol::{
    storage::{RecordHeader, RecordKind, RecordType},
    NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::NanoTokens;
use std::{
    borrow::Cow,
    collections::{HashMap, HashSet},
    fs,
    path::{Path, PathBuf},
    vec,
};
use tokio::sync::mpsc;
use xor_name::XorName;
const MAX_RECORDS_COUNT: usize = 2048;
pub struct NodeRecordStore {
        local_key: KBucketKey<PeerId>,
        config: NodeRecordStoreConfig,
        records: HashMap<Key, (NetworkAddress, RecordType)>,
        event_sender: Option<mpsc::Sender<NetworkEvent>>,
            distance_range: Option<Distance>,
    #[cfg(feature = "open-metrics")]
        record_count_metric: Option<Gauge>,
}
#[derive(Debug, Clone)]
pub struct NodeRecordStoreConfig {
        pub storage_dir: PathBuf,
        pub max_records: usize,
        pub max_value_bytes: usize,
}
impl Default for NodeRecordStoreConfig {
    fn default() -> Self {
        Self {
            storage_dir: std::env::temp_dir(),
            max_records: MAX_RECORDS_COUNT,
            max_value_bytes: 65 * 1024,
        }
    }
}
impl NodeRecordStore {
        pub fn with_config(
        local_id: PeerId,
        config: NodeRecordStoreConfig,
        event_sender: Option<mpsc::Sender<NetworkEvent>>,
    ) -> Self {
        NodeRecordStore {
            local_key: KBucketKey::from(local_id),
            config,
            records: Default::default(),
            event_sender,
            distance_range: None,
            #[cfg(feature = "open-metrics")]
            record_count_metric: None,
        }
    }
        #[cfg(feature = "open-metrics")]
    pub fn set_record_count_metric(mut self, metric: Gauge) -> Self {
        self.record_count_metric = Some(metric);
        self
    }
        fn key_to_hex(key: &Key) -> String {
        let key_bytes = key.as_ref();
        let mut hex_string = String::with_capacity(key_bytes.len() * 2);
        for byte in key_bytes {
            hex_string.push_str(&format!("{byte:02x}"));
        }
        hex_string
    }
    fn read_from_disk<'a>(key: &Key, storage_dir: &Path) -> Option<Cow<'a, Record>> {
        let start = std::time::Instant::now();
        let filename = Self::key_to_hex(key);
        let file_path = storage_dir.join(&filename);
        match fs::read(file_path) {
            Ok(value) => {
                debug!(
                    "Retrieved record from disk! filename: {filename} after {:?}",
                    start.elapsed()
                );
                let record = Record {
                    key: key.clone(),
                    value,
                    publisher: None,
                    expires: None,
                };
                Some(Cow::Owned(record))
            }
            Err(err) => {
                error!("Error while reading file. filename: {filename}, error: {err:?}");
                None
            }
        }
    }
                        fn prune_storage_if_needed_for_record(&mut self, r: &Key) -> Result<()> {
        let num_records = self.records.len();
                if num_records < self.config.max_records {
            return Ok(());
        }
                let furthest = self
            .records
            .keys()
            .max_by_key(|k| {
                let kbucket_key = KBucketKey::from(k.to_vec());
                self.local_key.distance(&kbucket_key)
            })
            .cloned();
                        if let Some(furthest_record) = furthest {
            let furthest_record_key = KBucketKey::from(furthest_record.to_vec());
            let incoming_record_key = KBucketKey::from(r.to_vec());
            if incoming_record_key.distance(&self.local_key)
                < furthest_record_key.distance(&self.local_key)
            {
                trace!(
                    "{:?} will be pruned to make space for new record: {:?}",
                    PrettyPrintRecordKey::from(&furthest_record),
                    PrettyPrintRecordKey::from(r)
                );
                                self.remove(&furthest_record);
                                if let Some(distance_range) = self.distance_range {
                    if furthest_record_key.distance(&self.local_key) < distance_range {
                        warn!("Pruned record would also be within our distance range.");
                    }
                }
            } else {
                                warn!("Record not stored (key: {r:?}). Maximum number of records reached. Current num_records: {num_records}");
                return Err(Error::MaxRecords);
            }
        }
        Ok(())
    }
}
impl NodeRecordStore {
        pub(crate) fn contains(&self, key: &Key) -> bool {
        self.records.contains_key(key)
    }
            pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
        self.records
            .iter()
            .map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone()))
            .collect()
    }
        #[allow(clippy::mutable_key_type)]
    pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, RecordType)> {
        &self.records
    }
            pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> {
        let record_key = PrettyPrintRecordKey::from(&r.key).into_owned();
        trace!("PUT a verified Record: {record_key:?}");
        self.prune_storage_if_needed_for_record(&r.key)?;
        let filename = Self::key_to_hex(&r.key);
        let file_path = self.config.storage_dir.join(&filename);
        let _ = self.records.insert(
            r.key.clone(),
            (NetworkAddress::from_record_key(&r.key), record_type),
        );
        #[cfg(feature = "open-metrics")]
        if let Some(metric) = &self.record_count_metric {
            let _ = metric.set(self.records.len() as i64);
        }
        let cloned_event_sender = self.event_sender.clone();
        tokio::spawn(async move {
            match fs::write(&file_path, r.value) {
                Ok(_) => {
                    info!("Wrote record {record_key:?} to disk! filename: {filename}");
                }
                Err(err) => {
                    error!(
                        "Error writing record {record_key:?} filename: {filename}, error: {err:?}"
                    );
                    if let Some(event_sender) = cloned_event_sender {
                        if let Err(error) =
                            event_sender.send(NetworkEvent::FailedToWrite(r.key)).await
                        {
                            error!("SwarmDriver failed to send event: {}", error);
                        }
                    } else {
                        error!("Record store doesn't have event_sender could not log failed write to disk for {file_path:?}");
                    }
                }
            }
        });
        Ok(())
    }
        #[allow(clippy::mutable_key_type)]
    pub(crate) fn store_cost(&self) -> NanoTokens {
        let relevant_records_len = if let Some(distance_range) = self.distance_range {
            let record_keys: HashSet<_> = self.records.keys().cloned().collect();
            self.get_records_within_distance_range(&record_keys, distance_range)
        } else {
            warn!("No distance range set on record store. Returning MAX_RECORDS_COUNT for relevant records in store cost calculation.");
            MAX_RECORDS_COUNT
        };
        let cost = calculate_cost_for_relevant_records(relevant_records_len);
        debug!("Cost is now {cost:?}");
        NanoTokens::from(cost)
    }
        #[allow(clippy::mutable_key_type)]
    pub fn get_records_within_distance_range(
        &self,
        records: &HashSet<Key>,
        distance_range: Distance,
    ) -> usize {
        debug!(
            "Total record count is {:?}. Distance is: {distance_range:?}",
            self.records.len()
        );
        let relevant_records_len = records
            .iter()
            .filter(|key| {
                let kbucket_key = KBucketKey::new(key.to_vec());
                distance_range >= self.local_key.distance(&kbucket_key)
            })
            .count();
        debug!("Relevant records len is {:?}", relevant_records_len);
        relevant_records_len
    }
        pub(crate) fn set_distance_range(&mut self, distance_range: Distance) {
        self.distance_range = Some(distance_range);
    }
}
impl RecordStore for NodeRecordStore {
    type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
    type ProvidedIter<'a> = vec::IntoIter<Cow<'a, ProviderRecord>>;
    fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
                                let key = PrettyPrintRecordKey::from(k);
        if !self.records.contains_key(k) {
            trace!("Record not found locally: {key}");
            return None;
        }
        debug!("GET request for Record key: {key}");
        Self::read_from_disk(k, &self.config.storage_dir)
    }
    fn put(&mut self, record: Record) -> Result<()> {
        if record.value.len() >= self.config.max_value_bytes {
            warn!(
                "Record not stored. Value too large: {} bytes",
                record.value.len()
            );
            return Err(Error::ValueTooLarge);
        }
        let record_key = PrettyPrintRecordKey::from(&record.key);
                        match RecordHeader::from_record(&record) {
            Ok(record_header) => {
                match record_header.kind {
                    RecordKind::ChunkWithPayment | RecordKind::RegisterWithPayment => {
                        trace!("Record {record_key:?} with payment shall always be processed.");
                    }
                    _ => {
                                                                                                                        match self.records.get(&record.key) {
                            Some((_addr, RecordType::Chunk)) => {
                                trace!("Chunk {record_key:?} already exists.");
                                return Ok(());
                            }
                            Some((_addr, RecordType::NonChunk(existing_content_hash))) => {
                                let content_hash = XorName::from_content(&record.value);
                                if content_hash == *existing_content_hash {
                                    trace!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists.");
                                    return Ok(());
                                }
                            }
                            _ => {}
                        }
                    }
                }
            }
            Err(err) => {
                error!("For record {record_key:?}, failed to parse record_header {err:?}");
                return Ok(());
            }
        }
        trace!("Unverified Record {record_key:?} try to validate and store");
        if let Some(event_sender) = self.event_sender.clone() {
                        let _handle = tokio::spawn(async move {
                if let Err(error) = event_sender
                    .send(NetworkEvent::UnverifiedRecord(record))
                    .await
                {
                    error!("SwarmDriver failed to send event: {}", error);
                }
            });
        } else {
            error!("Record store doesn't have event_sender setup");
        }
        Ok(())
    }
    fn remove(&mut self, k: &Key) {
        let _ = self.records.remove(k);
        #[cfg(feature = "open-metrics")]
        if let Some(metric) = &self.record_count_metric {
            let _ = metric.set(self.records.len() as i64);
        }
        let filename = Self::key_to_hex(k);
        let file_path = self.config.storage_dir.join(&filename);
        let _handle = tokio::spawn(async move {
            match fs::remove_file(file_path) {
                Ok(_) => {
                    info!("Removed record from disk! filename: {filename}");
                }
                Err(err) => {
                    error!("Error while removing file. filename: {filename}, error: {err:?}");
                }
            }
        });
    }
    fn records(&self) -> Self::RecordsIter<'_> {
                vec![].into_iter()
    }
    fn add_provider(&mut self, _record: ProviderRecord) -> Result<()> {
                Ok(())
    }
    fn providers(&self, _key: &Key) -> Vec<ProviderRecord> {
                vec![]
    }
    fn provided(&self) -> Self::ProvidedIter<'_> {
                vec![].into_iter()
    }
    fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {
            }
}
#[derive(Default, Debug)]
pub struct ClientRecordStore {
    empty_record_addresses: HashMap<Key, (NetworkAddress, RecordType)>,
}
impl ClientRecordStore {
    pub(crate) fn contains(&self, _key: &Key) -> bool {
        false
    }
    pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
        HashMap::new()
    }
    #[allow(clippy::mutable_key_type)]
    pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, RecordType)> {
        &self.empty_record_addresses
    }
    pub(crate) fn put_verified(&mut self, _r: Record, _record_type: RecordType) -> Result<()> {
        Ok(())
    }
    pub(crate) fn set_distance_range(&mut self, _distance_range: Distance) {}
}
impl RecordStore for ClientRecordStore {
    type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
    type ProvidedIter<'a> = vec::IntoIter<Cow<'a, ProviderRecord>>;
    fn get(&self, _k: &Key) -> Option<Cow<'_, Record>> {
        None
    }
    fn put(&mut self, _record: Record) -> Result<()> {
        Ok(())
    }
    fn remove(&mut self, _k: &Key) {}
    fn records(&self) -> Self::RecordsIter<'_> {
        vec![].into_iter()
    }
    fn add_provider(&mut self, _record: ProviderRecord) -> Result<()> {
        Ok(())
    }
    fn providers(&self, _key: &Key) -> Vec<ProviderRecord> {
        vec![]
    }
    fn provided(&self) -> Self::ProvidedIter<'_> {
        vec![].into_iter()
    }
    fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {}
}
fn calculate_cost_for_relevant_records(step: usize) -> u64 {
    assert!(
        step <= MAX_RECORDS_COUNT,
        "step must be <= MAX_RECORDS_COUNT"
    );
            let a = 0.000_000_010_f64;     let b = 1.019_f64;     let y = a * b.powf(step as f64);
    (y * 1_000_000_000_f64) as u64
}
#[allow(trivial_casts)]
#[cfg(test)]
mod tests {
    use std::time::Duration;
    use super::*;
    use bytes::Bytes;
    use eyre::ContextCompat;
    use libp2p::{
        core::multihash::Multihash,
        kad::{KBucketKey, RecordKey},
    };
    use quickcheck::*;
    use sn_protocol::storage::try_serialize_record;
    use tokio::runtime::Runtime;
    const MULITHASH_CODE: u64 = 0x12;
    #[derive(Clone, Debug)]
    struct ArbitraryKey(Key);
    #[derive(Clone, Debug)]
    struct ArbitraryPeerId(PeerId);
    #[derive(Clone, Debug)]
    struct ArbitraryKBucketKey(KBucketKey<PeerId>);
    #[derive(Clone, Debug)]
    struct ArbitraryRecord(Record);
    #[derive(Clone, Debug)]
    struct ArbitraryProviderRecord(ProviderRecord);
    impl Arbitrary for ArbitraryPeerId {
        fn arbitrary(g: &mut Gen) -> ArbitraryPeerId {
            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
            let peer_id = PeerId::from_multihash(
                Multihash::wrap(MULITHASH_CODE, &hash).expect("Failed to gen Multihash"),
            )
            .expect("Failed to create PeerId");
            ArbitraryPeerId(peer_id)
        }
    }
    impl Arbitrary for ArbitraryKBucketKey {
        fn arbitrary(_: &mut Gen) -> ArbitraryKBucketKey {
            ArbitraryKBucketKey(KBucketKey::from(PeerId::random()))
        }
    }
    impl Arbitrary for ArbitraryKey {
        fn arbitrary(g: &mut Gen) -> ArbitraryKey {
            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
            ArbitraryKey(Key::from(
                Multihash::<64>::wrap(MULITHASH_CODE, &hash).expect("Failed to gen MultiHash"),
            ))
        }
    }
    impl Arbitrary for ArbitraryRecord {
        fn arbitrary(g: &mut Gen) -> ArbitraryRecord {
            let value = match try_serialize_record(
                &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
                RecordKind::Chunk,
            ) {
                Ok(value) => value.to_vec(),
                Err(err) => panic!("Cannot generate record value {err:?}"),
            };
            let record = Record {
                key: ArbitraryKey::arbitrary(g).0,
                value,
                publisher: None,
                expires: None,
            };
            ArbitraryRecord(record)
        }
    }
    impl Arbitrary for ArbitraryProviderRecord {
        fn arbitrary(g: &mut Gen) -> ArbitraryProviderRecord {
            let record = ProviderRecord {
                key: ArbitraryKey::arbitrary(g).0,
                provider: PeerId::random(),
                expires: None,
                addresses: vec![],
            };
            ArbitraryProviderRecord(record)
        }
    }
    #[test]
    fn put_get_remove_record() {
        fn prop(r: ArbitraryRecord) {
            let rt = if let Ok(rt) = Runtime::new() {
                rt
            } else {
                panic!("Cannot create runtime");
            };
            rt.block_on(testing_thread(r));
        }
        quickcheck(prop as fn(_))
    }
    async fn testing_thread(r: ArbitraryRecord) {
        let r = r.0;
        let (network_event_sender, mut network_event_receiver) = mpsc::channel(1);
        let mut store = NodeRecordStore::with_config(
            PeerId::random(),
            Default::default(),
            Some(network_event_sender),
        );
        let store_cost_before = store.store_cost();
                assert!(store.put(r.clone()).is_ok());
        assert!(store.get(&r.key).is_none());
                assert_eq!(
            store.store_cost(),
            store_cost_before,
            "store cost should not change over unverified put"
        );
        let returned_record = if let Some(event) = network_event_receiver.recv().await {
            if let NetworkEvent::UnverifiedRecord(record) = event {
                record
            } else {
                panic!("Unexpected network event {event:?}");
            }
        } else {
            panic!("Failed recevied the record for further verification");
        };
        assert!(store
            .put_verified(returned_record, RecordType::Chunk)
            .is_ok());
                let max_iterations = 10;
        let mut iteration = 0;
        while iteration < max_iterations {
                                                if store
                .get(&r.key)
                .is_some_and(|record| Cow::Borrowed(&r) == record)
            {
                break;
            }
            tokio::time::sleep(Duration::from_millis(100)).await;
            iteration += 1;
        }
        if iteration == max_iterations {
            panic!("record_store test failed with stored record cann't be read back");
        }
        assert_eq!(
            Some(Cow::Borrowed(&r)),
            store.get(&r.key),
            "record can be retrieved after put"
        );
        store.remove(&r.key);
        assert!(store.get(&r.key).is_none());
    }
    #[tokio::test]
    async fn pruning_on_full() -> Result<()> {
        let max_iterations = 10;
        let max_records = 50;
                                let store_config = NodeRecordStoreConfig {
            max_records,
            ..Default::default()
        };
        let self_id = PeerId::random();
        let mut store = NodeRecordStore::with_config(self_id, store_config.clone(), None);
        let mut stored_records: Vec<RecordKey> = vec![];
        let self_address = NetworkAddress::from_peer(self_id);
        for i in 0..100 {
            let record_key = NetworkAddress::from_peer(PeerId::random()).to_record_key();
            let value = match try_serialize_record(
                &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
                RecordKind::Chunk,
            ) {
                Ok(value) => value.to_vec(),
                Err(err) => panic!("Cannot generate record value {err:?}"),
            };
            let record = Record {
                key: record_key.clone(),
                value,
                publisher: None,
                expires: None,
            };
            let retained_key = if i < max_records {
                assert!(store.put_verified(record, RecordType::Chunk).is_ok());
                record_key
            } else {
                                let furthest_key = stored_records.remove(stored_records.len() - 1);
                let furthest_addr = NetworkAddress::from_record_key(&furthest_key);
                let record_addr = NetworkAddress::from_record_key(&record_key);
                let (retained_key, pruned_key) = if self_address.distance(&furthest_addr)
                    > self_address.distance(&record_addr)
                {
                                        assert!(store.put_verified(record, RecordType::Chunk).is_ok());
                    (record_key, furthest_key)
                } else {
                                        assert!(store.put_verified(record, RecordType::Chunk).is_err());
                    (furthest_key, record_key)
                };
                                let mut iteration = 0;
                while iteration < max_iterations {
                    if NodeRecordStore::read_from_disk(&pruned_key, &store_config.storage_dir)
                        .is_none()
                    {
                        break;
                    }
                    tokio::time::sleep(Duration::from_millis(100)).await;
                    iteration += 1;
                }
                if iteration == max_iterations {
                    panic!("record_store prune test failed with pruned record still exists.");
                }
                retained_key
            };
                        let mut iteration = 0;
            while iteration < max_iterations {
                if store.get(&retained_key).is_some() {
                    break;
                }
                tokio::time::sleep(Duration::from_millis(100)).await;
                iteration += 1;
            }
            if iteration == max_iterations {
                panic!("record_store prune test failed with stored record cann't be read back");
            }
            stored_records.push(retained_key);
            stored_records.sort_by(|a, b| {
                let a = NetworkAddress::from_record_key(a);
                let b = NetworkAddress::from_record_key(b);
                self_address.distance(&a).cmp(&self_address.distance(&b))
            });
        }
        Ok(())
    }
    #[tokio::test]
    #[allow(clippy::mutable_key_type)]
    async fn get_records_within_distance_range() -> eyre::Result<()> {
        let max_records = 50;
                let store_config = NodeRecordStoreConfig {
            max_records,
            ..Default::default()
        };
        let self_id = PeerId::random();
        let mut store = NodeRecordStore::with_config(self_id, store_config, None);
        let mut stored_records: Vec<RecordKey> = vec![];
        let self_address = NetworkAddress::from_peer(self_id);
                        for _ in 0..max_records - 1 {
            let record_key = NetworkAddress::from_peer(PeerId::random()).to_record_key();
            let value = match try_serialize_record(
                &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
                RecordKind::Chunk,
            ) {
                Ok(value) => value.to_vec(),
                Err(err) => panic!("Cannot generate record value {err:?}"),
            };
            let record = Record {
                key: record_key.clone(),
                value,
                publisher: None,
                expires: None,
            };
                        assert!(store.put_verified(record, RecordType::Chunk).is_ok());
            stored_records.push(record_key);
            stored_records.sort_by(|a, b| {
                let a = NetworkAddress::from_record_key(a);
                let b = NetworkAddress::from_record_key(b);
                self_address.distance(&a).cmp(&self_address.distance(&b))
            });
        }
                let halfway_record_address = NetworkAddress::from_record_key(
            stored_records
                .get((stored_records.len() / 2) - 1)
                .wrap_err("Could not parse record store key")?,
        );
                let distance = self_address.distance(&halfway_record_address);
        store.set_distance_range(distance);
        let record_keys: HashSet<_> = store.records.keys().cloned().collect();
                assert_eq!(
            store.get_records_within_distance_range(&record_keys, distance),
            stored_records.len() / 2
        );
        Ok(())
    }
}