use std::{
    cmp::Ordering,
    fmt::Debug,
    ops::{Deref, DerefMut},
    sync::Arc,
    time::{Duration, SystemTime},
};
use bytes::{Bytes, BytesMut};
use ed25519_dalek::{Signature, SignatureError};
use iroh_base::{base32, hash::Hash};
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
pub use crate::heads::AuthorHeads;
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
use crate::{
    keys::{Author, AuthorId, AuthorPublicKey, NamespaceId, NamespacePublicKey, NamespaceSecret},
    ranger::{self, Fingerprint, InsertOutcome, RangeEntry, RangeKey, RangeValue, Store},
    store::{self, fs::StoreInstance, DownloadPolicyStore, PublicKeyStore},
};
pub type ProtocolMessage = crate::ranger::Message<SignedEntry>;
pub type PeerIdBytes = [u8; 32];
pub const MAX_TIMESTAMP_FUTURE_SHIFT: u64 = 10 * 60 * Duration::from_secs(1).as_millis() as u64;
pub type ContentStatusCallback = Arc<dyn Fn(Hash) -> ContentStatus + Send + Sync + 'static>;
#[derive(Debug, Clone)]
pub enum Event {
    LocalInsert {
        namespace: NamespaceId,
        entry: SignedEntry,
    },
    RemoteInsert {
        namespace: NamespaceId,
        entry: SignedEntry,
        from: PeerIdBytes,
        should_download: bool,
        remote_content_status: ContentStatus,
    },
}
#[derive(Debug, Clone)]
pub enum InsertOrigin {
    Local,
    Sync {
        from: PeerIdBytes,
        remote_content_status: ContentStatus,
    },
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum ContentStatus {
    Complete,
    Incomplete,
    Missing,
}
#[derive(Debug, Clone, Default)]
pub struct SyncOutcome {
    pub heads_received: AuthorHeads,
    pub num_recv: usize,
    pub num_sent: usize,
}
fn get_as_ptr<T>(value: &T) -> Option<usize> {
    use std::mem;
    if mem::size_of::<T>() == std::mem::size_of::<usize>()
        && mem::align_of::<T>() == mem::align_of::<usize>()
    {
        unsafe { Some(mem::transmute_copy(value)) }
    } else {
        None
    }
}
fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
    get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
}
#[derive(Debug, Default)]
struct Subscribers(Vec<async_channel::Sender<Event>>);
impl Subscribers {
    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
        self.0.push(sender)
    }
    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
        self.0.retain(|s| !same_channel(s, sender));
    }
    pub fn send(&mut self, event: Event) {
        self.0
            .retain(|sender| sender.send_blocking(event.clone()).is_ok())
    }
    pub fn len(&self) -> usize {
        self.0.len()
    }
    pub fn send_with(&mut self, f: impl FnOnce() -> Event) {
        if !self.0.is_empty() {
            self.send(f())
        }
    }
}
#[derive(
    Debug,
    Clone,
    Copy,
    Serialize,
    Deserialize,
    num_enum::IntoPrimitive,
    num_enum::TryFromPrimitive,
    strum::Display,
)]
#[repr(u8)]
#[strum(serialize_all = "snake_case")]
pub enum CapabilityKind {
    Write = 1,
    Read = 2,
}
#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From)]
pub enum Capability {
    Write(NamespaceSecret),
    Read(NamespaceId),
}
impl Capability {
    pub fn id(&self) -> NamespaceId {
        match self {
            Capability::Write(secret) => secret.id(),
            Capability::Read(id) => *id,
        }
    }
    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
        match self {
            Capability::Write(secret) => Ok(secret),
            Capability::Read(_) => Err(ReadOnly),
        }
    }
    pub fn kind(&self) -> CapabilityKind {
        match self {
            Capability::Write(_) => CapabilityKind::Write,
            Capability::Read(_) => CapabilityKind::Read,
        }
    }
    pub fn raw(&self) -> (u8, [u8; 32]) {
        let capability_repr: u8 = self.kind().into();
        let bytes = match self {
            Capability::Write(secret) => secret.to_bytes(),
            Capability::Read(id) => id.to_bytes(),
        };
        (capability_repr, bytes)
    }
    pub fn from_raw(kind: u8, bytes: &[u8; 32]) -> anyhow::Result<Self> {
        let kind: CapabilityKind = kind.try_into()?;
        let capability = match kind {
            CapabilityKind::Write => {
                let secret = NamespaceSecret::from_bytes(bytes);
                Capability::Write(secret)
            }
            CapabilityKind::Read => {
                let id = NamespaceId::from(bytes);
                Capability::Read(id)
            }
        };
        Ok(capability)
    }
    pub fn merge(&mut self, other: Capability) -> Result<bool, CapabilityError> {
        if other.id() != self.id() {
            return Err(CapabilityError::NamespaceMismatch);
        }
        if matches!(self, Capability::Read(_)) && matches!(other, Capability::Write(_)) {
            let _ = std::mem::replace(self, other);
            Ok(true)
        } else {
            Ok(false)
        }
    }
}
#[derive(Debug, thiserror::Error)]
pub enum CapabilityError {
    #[error("Namespaces are not the same")]
    NamespaceMismatch,
}
#[derive(derive_more::Debug)]
pub struct ReplicaInfo {
    pub(crate) capability: Capability,
    subscribers: Subscribers,
    #[debug("ContentStatusCallback")]
    content_status_cb: Option<ContentStatusCallback>,
    closed: bool,
}
impl ReplicaInfo {
    pub fn new(capability: Capability) -> Self {
        Self {
            capability,
            subscribers: Default::default(),
            content_status_cb: None,
            closed: false,
        }
    }
    pub fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
        self.subscribers.subscribe(sender)
    }
    pub fn unsubscribe(&mut self, sender: &async_channel::Sender<Event>) {
        self.subscribers.unsubscribe(sender)
    }
    pub fn subscribers_count(&self) -> usize {
        self.subscribers.len()
    }
    pub fn set_content_status_callback(&mut self, cb: ContentStatusCallback) -> bool {
        if self.content_status_cb.is_some() {
            false
        } else {
            self.content_status_cb = Some(cb);
            true
        }
    }
    fn ensure_open(&self) -> Result<(), InsertError> {
        if self.closed() {
            Err(InsertError::Closed)
        } else {
            Ok(())
        }
    }
    pub fn closed(&self) -> bool {
        self.closed
    }
    pub fn merge_capability(&mut self, capability: Capability) -> Result<bool, CapabilityError> {
        self.capability.merge(capability)
    }
}
#[derive(derive_more::Debug)]
pub struct Replica<'a, I = Box<ReplicaInfo>> {
    pub(crate) store: StoreInstance<'a>,
    pub(crate) info: I,
}
impl<'a, I> Replica<'a, I>
where
    I: Deref<Target = ReplicaInfo> + DerefMut,
{
    pub fn new(store: StoreInstance<'a>, info: I) -> Self {
        Replica { info, store }
    }
    pub fn insert(
        &mut self,
        key: impl AsRef<[u8]>,
        author: &Author,
        hash: Hash,
        len: u64,
    ) -> Result<usize, InsertError> {
        if len == 0 || hash == Hash::EMPTY {
            return Err(InsertError::EntryIsEmpty);
        }
        self.info.ensure_open()?;
        let id = RecordIdentifier::new(self.id(), author.id(), key);
        let record = Record::new_current(hash, len);
        let entry = Entry::new(id, record);
        let secret = self.secret_key()?;
        let signed_entry = entry.sign(secret, author);
        self.insert_entry(signed_entry, InsertOrigin::Local)
    }
    pub fn delete_prefix(
        &mut self,
        prefix: impl AsRef<[u8]>,
        author: &Author,
    ) -> Result<usize, InsertError> {
        self.info.ensure_open()?;
        let id = RecordIdentifier::new(self.id(), author.id(), prefix);
        let entry = Entry::new_empty(id);
        let signed_entry = entry.sign(self.secret_key()?, author);
        self.insert_entry(signed_entry, InsertOrigin::Local)
    }
    pub fn insert_remote_entry(
        &mut self,
        entry: SignedEntry,
        received_from: PeerIdBytes,
        content_status: ContentStatus,
    ) -> Result<usize, InsertError> {
        self.info.ensure_open()?;
        entry.validate_empty()?;
        let origin = InsertOrigin::Sync {
            from: received_from,
            remote_content_status: content_status,
        };
        self.insert_entry(entry, origin)
    }
    fn insert_entry(
        &mut self,
        entry: SignedEntry,
        origin: InsertOrigin,
    ) -> Result<usize, InsertError> {
        let namespace = self.id();
        #[cfg(feature = "metrics")]
        let len = entry.content_len();
        let store = &self.store;
        validate_entry(system_time_now(), store, namespace, &entry, &origin)?;
        let outcome = self.store.put(entry.clone()).map_err(InsertError::Store)?;
        tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");
        let removed_count = match outcome {
            InsertOutcome::Inserted { removed } => removed,
            InsertOutcome::NotInserted => return Err(InsertError::NewerEntryExists),
        };
        let insert_event = match origin {
            InsertOrigin::Local => {
                #[cfg(feature = "metrics")]
                {
                    inc!(Metrics, new_entries_local);
                    inc_by!(Metrics, new_entries_local_size, len);
                }
                Event::LocalInsert { namespace, entry }
            }
            InsertOrigin::Sync {
                from,
                remote_content_status,
            } => {
                #[cfg(feature = "metrics")]
                {
                    inc!(Metrics, new_entries_remote);
                    inc_by!(Metrics, new_entries_remote_size, len);
                }
                let download_policy = self
                    .store
                    .get_download_policy(&self.id())
                    .unwrap_or_default();
                let should_download = download_policy.matches(entry.entry());
                Event::RemoteInsert {
                    namespace,
                    entry,
                    from,
                    should_download,
                    remote_content_status,
                }
            }
        };
        self.info.subscribers.send(insert_event);
        Ok(removed_count)
    }
    pub fn hash_and_insert(
        &mut self,
        key: impl AsRef<[u8]>,
        author: &Author,
        data: impl AsRef<[u8]>,
    ) -> Result<Hash, InsertError> {
        self.info.ensure_open()?;
        let len = data.as_ref().len() as u64;
        let hash = Hash::new(data);
        self.insert(key, author, hash, len)?;
        Ok(hash)
    }
    pub fn record_id(&self, key: impl AsRef<[u8]>, author: &Author) -> RecordIdentifier {
        RecordIdentifier::new(self.info.capability.id(), author.id(), key)
    }
    pub fn sync_initial_message(&mut self) -> anyhow::Result<crate::ranger::Message<SignedEntry>> {
        self.info.ensure_open().map_err(anyhow::Error::from)?;
        self.store.initial_message().map_err(Into::into)
    }
    pub fn sync_process_message(
        &mut self,
        message: crate::ranger::Message<SignedEntry>,
        from_peer: PeerIdBytes,
        state: &mut SyncOutcome,
    ) -> Result<Option<crate::ranger::Message<SignedEntry>>, anyhow::Error> {
        self.info.ensure_open()?;
        let my_namespace = self.id();
        let now = system_time_now();
        state.num_recv += message.value_count();
        for (entry, _content_status) in message.values() {
            state
                .heads_received
                .insert(entry.author(), entry.timestamp());
        }
        let cb = self.info.content_status_cb.clone();
        let download_policy = self
            .store
            .get_download_policy(&my_namespace)
            .unwrap_or_default();
        let reply = self.store.process_message(
            &Default::default(),
            message,
            |store, entry, content_status| {
                let origin = InsertOrigin::Sync {
                    from: from_peer,
                    remote_content_status: content_status,
                };
                validate_entry(now, store, my_namespace, entry, &origin).is_ok()
            },
            |_store, entry, content_status| {
                self.info.subscribers.send_with(|| {
                    let should_download = download_policy.matches(entry.entry());
                    Event::RemoteInsert {
                        from: from_peer,
                        namespace: my_namespace,
                        entry: entry.clone(),
                        should_download,
                        remote_content_status: content_status,
                    }
                })
            },
            |_store, entry| {
                if let Some(cb) = cb.as_ref() {
                    cb(entry.content_hash())
                } else {
                    ContentStatus::Missing
                }
            },
        )?;
        if let Some(ref reply) = reply {
            state.num_sent += reply.value_count();
        }
        Ok(reply)
    }
    pub fn id(&self) -> NamespaceId {
        self.info.capability.id()
    }
    pub fn capability(&self) -> &Capability {
        &self.info.capability
    }
    pub fn secret_key(&self) -> Result<&NamespaceSecret, ReadOnly> {
        self.info.capability.secret_key()
    }
}
#[derive(Debug, thiserror::Error)]
#[error("Replica allows read access only.")]
pub struct ReadOnly;
fn validate_entry<S: ranger::Store<SignedEntry> + PublicKeyStore>(
    now: u64,
    store: &S,
    expected_namespace: NamespaceId,
    entry: &SignedEntry,
    origin: &InsertOrigin,
) -> Result<(), ValidationFailure> {
    if entry.namespace() != expected_namespace {
        return Err(ValidationFailure::InvalidNamespace);
    }
    if !matches!(origin, InsertOrigin::Local) && entry.verify(store).is_err() {
        return Err(ValidationFailure::BadSignature);
    }
    if entry.timestamp() > now + MAX_TIMESTAMP_FUTURE_SHIFT {
        return Err(ValidationFailure::TooFarInTheFuture);
    }
    Ok(())
}
#[derive(thiserror::Error, derive_more::Debug, derive_more::From)]
pub enum InsertError {
    #[error("storage error")]
    Store(anyhow::Error),
    #[error("validation failure")]
    Validation(#[from] ValidationFailure),
    #[error("A newer entry exists for either this entry's key or a prefix of the key.")]
    NewerEntryExists,
    #[error("Attempted to insert an empty entry")]
    EntryIsEmpty,
    #[error("Attempted to insert to read only replica")]
    #[from(ReadOnly)]
    ReadOnly,
    #[error("replica is closed")]
    Closed,
}
#[derive(thiserror::Error, Debug)]
pub enum ValidationFailure {
    #[error("Entry namespace does not match the current replica")]
    InvalidNamespace,
    #[error("Entry signature is invalid")]
    BadSignature,
    #[error("Entry timestamp is too far in the future.")]
    TooFarInTheFuture,
    #[error("Entry has length 0 but not the empty hash, or the empty hash but not length 0")]
    InvalidEmptyEntry,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SignedEntry {
    signature: EntrySignature,
    entry: Entry,
}
impl From<SignedEntry> for Entry {
    fn from(value: SignedEntry) -> Self {
        value.entry
    }
}
impl PartialOrd for SignedEntry {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl Ord for SignedEntry {
    fn cmp(&self, other: &Self) -> Ordering {
        self.entry.cmp(&other.entry)
    }
}
impl PartialOrd for Entry {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl Ord for Entry {
    fn cmp(&self, other: &Self) -> Ordering {
        self.id
            .cmp(&other.id)
            .then_with(|| self.record.cmp(&other.record))
    }
}
impl SignedEntry {
    pub(crate) fn new(signature: EntrySignature, entry: Entry) -> Self {
        SignedEntry { signature, entry }
    }
    pub fn from_entry(entry: Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
        let signature = EntrySignature::from_entry(&entry, namespace, author);
        SignedEntry { signature, entry }
    }
    pub fn from_parts(
        namespace: &NamespaceSecret,
        author: &Author,
        key: impl AsRef<[u8]>,
        record: Record,
    ) -> Self {
        let id = RecordIdentifier::new(namespace.id(), author.id(), key);
        let entry = Entry::new(id, record);
        Self::from_entry(entry, namespace, author)
    }
    pub fn verify<S: store::PublicKeyStore>(&self, store: &S) -> Result<(), SignatureError> {
        self.signature.verify(
            &self.entry,
            &self.entry.namespace().public_key(store)?,
            &self.entry.author().public_key(store)?,
        )
    }
    pub fn signature(&self) -> &EntrySignature {
        &self.signature
    }
    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
        self.entry().validate_empty()
    }
    pub fn entry(&self) -> &Entry {
        &self.entry
    }
    pub fn content_hash(&self) -> Hash {
        self.entry().content_hash()
    }
    pub fn content_len(&self) -> u64 {
        self.entry().content_len()
    }
    pub fn author_bytes(&self) -> AuthorId {
        self.entry().id().author()
    }
    pub fn key(&self) -> &[u8] {
        self.entry().id().key()
    }
    pub fn timestamp(&self) -> u64 {
        self.entry().timestamp()
    }
}
impl RangeEntry for SignedEntry {
    type Key = RecordIdentifier;
    type Value = Record;
    fn key(&self) -> &Self::Key {
        &self.entry.id
    }
    fn value(&self) -> &Self::Value {
        &self.entry.record
    }
    fn as_fingerprint(&self) -> crate::ranger::Fingerprint {
        let mut hasher = blake3::Hasher::new();
        hasher.update(self.namespace().as_ref());
        hasher.update(self.author_bytes().as_ref());
        hasher.update(self.key());
        hasher.update(&self.timestamp().to_be_bytes());
        hasher.update(self.content_hash().as_bytes());
        Fingerprint(hasher.finalize().into())
    }
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EntrySignature {
    author_signature: Signature,
    namespace_signature: Signature,
}
impl Debug for EntrySignature {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("EntrySignature")
            .field(
                "namespace_signature",
                &base32::fmt(self.namespace_signature.to_bytes()),
            )
            .field(
                "author_signature",
                &base32::fmt(self.author_signature.to_bytes()),
            )
            .finish()
    }
}
impl EntrySignature {
    pub fn from_entry(entry: &Entry, namespace: &NamespaceSecret, author: &Author) -> Self {
        let bytes = entry.to_vec();
        let namespace_signature = namespace.sign(&bytes);
        let author_signature = author.sign(&bytes);
        EntrySignature {
            author_signature,
            namespace_signature,
        }
    }
    pub fn verify(
        &self,
        entry: &Entry,
        namespace: &NamespacePublicKey,
        author: &AuthorPublicKey,
    ) -> Result<(), SignatureError> {
        let bytes = entry.to_vec();
        namespace.verify(&bytes, &self.namespace_signature)?;
        author.verify(&bytes, &self.author_signature)?;
        Ok(())
    }
    pub(crate) fn from_parts(namespace_sig: &[u8; 64], author_sig: &[u8; 64]) -> Self {
        let namespace_signature = Signature::from_bytes(namespace_sig);
        let author_signature = Signature::from_bytes(author_sig);
        EntrySignature {
            author_signature,
            namespace_signature,
        }
    }
    pub(crate) fn author(&self) -> &Signature {
        &self.author_signature
    }
    pub(crate) fn namespace(&self) -> &Signature {
        &self.namespace_signature
    }
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Entry {
    id: RecordIdentifier,
    record: Record,
}
impl Entry {
    pub fn new(id: RecordIdentifier, record: Record) -> Self {
        Entry { id, record }
    }
    pub fn new_empty(id: RecordIdentifier) -> Self {
        Entry {
            id,
            record: Record::empty_current(),
        }
    }
    pub fn validate_empty(&self) -> Result<(), ValidationFailure> {
        match (self.content_hash() == Hash::EMPTY, self.content_len() == 0) {
            (true, true) => Ok(()),
            (false, false) => Ok(()),
            (true, false) => Err(ValidationFailure::InvalidEmptyEntry),
            (false, true) => Err(ValidationFailure::InvalidEmptyEntry),
        }
    }
    pub fn id(&self) -> &RecordIdentifier {
        &self.id
    }
    pub fn namespace(&self) -> NamespaceId {
        self.id.namespace()
    }
    pub fn author(&self) -> AuthorId {
        self.id.author()
    }
    pub fn key(&self) -> &[u8] {
        self.id.key()
    }
    pub fn record(&self) -> &Record {
        &self.record
    }
    pub fn encode(&self, out: &mut Vec<u8>) {
        self.id.encode(out);
        self.record.encode(out);
    }
    pub fn to_vec(&self) -> Vec<u8> {
        let mut out = Vec::new();
        self.encode(&mut out);
        out
    }
    pub fn sign(self, namespace: &NamespaceSecret, author: &Author) -> SignedEntry {
        SignedEntry::from_entry(self, namespace, author)
    }
}
const NAMESPACE_BYTES: std::ops::Range<usize> = 0..32;
const AUTHOR_BYTES: std::ops::Range<usize> = 32..64;
const KEY_BYTES: std::ops::RangeFrom<usize> = 64..;
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct RecordIdentifier(Bytes);
impl Default for RecordIdentifier {
    fn default() -> Self {
        Self::new(NamespaceId::default(), AuthorId::default(), b"")
    }
}
impl Debug for RecordIdentifier {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RecordIdentifier")
            .field("namespace", &self.namespace())
            .field("author", &self.author())
            .field("key", &std::string::String::from_utf8_lossy(self.key()))
            .finish()
    }
}
impl RangeKey for RecordIdentifier {
    #[cfg(test)]
    fn is_prefix_of(&self, other: &Self) -> bool {
        other.as_ref().starts_with(self.as_ref())
    }
}
fn system_time_now() -> u64 {
    SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .expect("time drift")
        .as_micros() as u64
}
impl RecordIdentifier {
    pub fn new(
        namespace: impl Into<NamespaceId>,
        author: impl Into<AuthorId>,
        key: impl AsRef<[u8]>,
    ) -> Self {
        let mut bytes = BytesMut::with_capacity(32 + 32 + key.as_ref().len());
        bytes.extend_from_slice(namespace.into().as_bytes());
        bytes.extend_from_slice(author.into().as_bytes());
        bytes.extend_from_slice(key.as_ref());
        Self(bytes.freeze())
    }
    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
        out.extend_from_slice(&self.0);
    }
    pub fn as_bytes(&self) -> Bytes {
        self.0.clone()
    }
    pub fn as_byte_tuple(&self) -> (&[u8; 32], &[u8; 32], &[u8]) {
        (
            self.0[NAMESPACE_BYTES].try_into().unwrap(),
            self.0[AUTHOR_BYTES].try_into().unwrap(),
            &self.0[KEY_BYTES],
        )
    }
    pub fn to_byte_tuple(&self) -> ([u8; 32], [u8; 32], Bytes) {
        (
            self.0[NAMESPACE_BYTES].try_into().unwrap(),
            self.0[AUTHOR_BYTES].try_into().unwrap(),
            self.0.slice(KEY_BYTES),
        )
    }
    pub fn key(&self) -> &[u8] {
        &self.0[KEY_BYTES]
    }
    pub fn key_bytes(&self) -> Bytes {
        self.0.slice(KEY_BYTES)
    }
    pub fn namespace(&self) -> NamespaceId {
        let value: &[u8; 32] = &self.0[NAMESPACE_BYTES].try_into().unwrap();
        value.into()
    }
    pub fn author(&self) -> AuthorId {
        let value: &[u8; 32] = &self.0[AUTHOR_BYTES].try_into().unwrap();
        value.into()
    }
}
impl AsRef<[u8]> for RecordIdentifier {
    fn as_ref(&self) -> &[u8] {
        &self.0
    }
}
impl Deref for SignedEntry {
    type Target = Entry;
    fn deref(&self) -> &Self::Target {
        &self.entry
    }
}
impl Deref for Entry {
    type Target = Record;
    fn deref(&self) -> &Self::Target {
        &self.record
    }
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Record {
    len: u64,
    hash: Hash,
    timestamp: u64,
}
impl RangeValue for Record {}
impl Ord for Record {
    fn cmp(&self, other: &Self) -> Ordering {
        self.timestamp
            .cmp(&other.timestamp)
            .then_with(|| self.hash.cmp(&other.hash))
    }
}
impl PartialOrd for Record {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl Record {
    pub fn new(hash: Hash, len: u64, timestamp: u64) -> Self {
        debug_assert!(
            len != 0 || hash == Hash::EMPTY,
            "if `len` is 0 then `hash` must be the hash of the empty byte range"
        );
        Record {
            hash,
            len,
            timestamp,
        }
    }
    pub fn empty(timestamp: u64) -> Self {
        Self::new(Hash::EMPTY, 0, timestamp)
    }
    pub fn empty_current() -> Self {
        Self::new_current(Hash::EMPTY, 0)
    }
    pub fn is_empty(&self) -> bool {
        self.hash == Hash::EMPTY
    }
    pub fn new_current(hash: Hash, len: u64) -> Self {
        let timestamp = system_time_now();
        Self::new(hash, len, timestamp)
    }
    pub fn content_len(&self) -> u64 {
        self.len
    }
    pub fn content_hash(&self) -> Hash {
        self.hash
    }
    pub fn timestamp(&self) -> u64 {
        self.timestamp
    }
    #[cfg(test)]
    pub(crate) fn current_from_data(data: impl AsRef<[u8]>) -> Self {
        let len = data.as_ref().len() as u64;
        let hash = Hash::new(data);
        Self::new_current(hash, len)
    }
    #[cfg(test)]
    pub(crate) fn from_data(data: impl AsRef<[u8]>, timestamp: u64) -> Self {
        let len = data.as_ref().len() as u64;
        let hash = Hash::new(data);
        Self::new(hash, len, timestamp)
    }
    pub(crate) fn encode(&self, out: &mut Vec<u8>) {
        out.extend_from_slice(&self.len.to_be_bytes());
        out.extend_from_slice(self.hash.as_ref());
        out.extend_from_slice(&self.timestamp.to_be_bytes())
    }
}
#[cfg(test)]
mod tests {
    use std::collections::HashSet;
    use anyhow::Result;
    use rand_core::SeedableRng;
    use super::*;
    use crate::{
        actor::SyncHandle,
        ranger::{Range, Store as _},
        store::{OpenError, Query, SortBy, SortDirection, Store},
    };
    #[test]
    fn test_basics_memory() -> Result<()> {
        let store = store::Store::memory();
        test_basics(store)?;
        Ok(())
    }
    #[test]
    fn test_basics_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_basics(store)?;
        Ok(())
    }
    fn test_basics(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let alice = Author::new(&mut rng);
        let bob = Author::new(&mut rng);
        let myspace = NamespaceSecret::new(&mut rng);
        let record_id = RecordIdentifier::new(myspace.id(), alice.id(), "/my/key");
        let record = Record::current_from_data(b"this is my cool data");
        let entry = Entry::new(record_id, record);
        let signed_entry = entry.sign(&myspace, &alice);
        signed_entry.verify(&()).expect("failed to verify");
        let mut my_replica = store.new_replica(myspace.clone())?;
        for i in 0..10 {
            my_replica.hash_and_insert(
                format!("/{i}"),
                &alice,
                format!("{i}: hello from alice"),
            )?;
        }
        for i in 0..10 {
            let res = store
                .get_exact(myspace.id(), alice.id(), format!("/{i}"), false)?
                .unwrap();
            let len = format!("{i}: hello from alice").as_bytes().len() as u64;
            assert_eq!(res.entry().record().content_len(), len);
            res.verify(&())?;
        }
        let mut my_replica = store.new_replica(myspace.clone())?;
        my_replica.hash_and_insert("/cool/path", &alice, "round 1")?;
        let _entry = store
            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
            .unwrap();
        let mut my_replica = store.new_replica(myspace.clone())?;
        my_replica.hash_and_insert("/cool/path", &alice, "round 2")?;
        let _entry = store
            .get_exact(myspace.id(), alice.id(), "/cool/path", false)?
            .unwrap();
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::author(alice.id()))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 11);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::author(bob.id()))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 0);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 1);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::all())?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 11);
        let mut my_replica = store.new_replica(myspace.clone())?;
        let _entry = my_replica.hash_and_insert("/cool/path", &bob, "bob round 1")?;
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::author(alice.id()))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 11);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::author(bob.id()))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 1);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::key_exact(b"/cool/path"))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 2);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::key_prefix(b"/cool"))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 2);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::author(alice.id()).key_prefix(b"/cool"))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 1);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::author(bob.id()).key_prefix(b"/cool"))?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 1);
        let entries: Vec<_> = store
            .get_many(myspace.id(), Query::all())?
            .collect::<Result<_>>()?;
        assert_eq!(entries.len(), 12);
        let mut my_replica = store.new_replica(myspace.clone())?;
        let entries_second: Vec<_> = my_replica
            .store
            .get_range(Range::new(
                RecordIdentifier::default(),
                RecordIdentifier::default(),
            ))?
            .collect::<Result<_, _>>()?;
        assert_eq!(entries_second.len(), 12);
        assert_eq!(entries, entries_second.into_iter().collect::<Vec<_>>());
        test_lru_cache_like_behaviour(&mut store, myspace.id())
    }
    fn test_lru_cache_like_behaviour(store: &mut Store, namespace: NamespaceId) -> Result<()> {
        #[track_caller]
        fn verify_peers(store: &mut Store, namespace: NamespaceId, expected_peers: &Vec<[u8; 32]>) {
            assert_eq!(
                expected_peers,
                &store
                    .get_sync_peers(&namespace)
                    .unwrap()
                    .unwrap()
                    .collect::<Vec<_>>(),
                "sync peers differ"
            );
        }
        let count = super::store::PEERS_PER_DOC_CACHE_SIZE.get();
        let mut expected_peers = Vec::with_capacity(count);
        for i in 0..count as u8 {
            let peer = [i; 32];
            expected_peers.insert(0, peer);
            store.register_useful_peer(namespace, peer)?;
        }
        verify_peers(store, namespace, &expected_peers);
        expected_peers.pop();
        let newer_peer = [count as u8; 32];
        expected_peers.insert(0, newer_peer);
        store.register_useful_peer(namespace, newer_peer)?;
        verify_peers(store, namespace, &expected_peers);
        let refreshed_peer = expected_peers.remove(2);
        expected_peers.insert(0, refreshed_peer);
        store.register_useful_peer(namespace, refreshed_peer)?;
        verify_peers(store, namespace, &expected_peers);
        Ok(())
    }
    #[test]
    fn test_content_hashes_iterator_memory() -> Result<()> {
        let store = store::Store::memory();
        test_content_hashes_iterator(store)
    }
    #[test]
    fn test_content_hashes_iterator_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_content_hashes_iterator(store)
    }
    fn test_content_hashes_iterator(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let mut expected = HashSet::new();
        let n_replicas = 3;
        let n_entries = 4;
        for i in 0..n_replicas {
            let namespace = NamespaceSecret::new(&mut rng);
            let author = store.new_author(&mut rng)?;
            let mut replica = store.new_replica(namespace)?;
            for j in 0..n_entries {
                let key = format!("{j}");
                let data = format!("{i}:{j}");
                let hash = replica.hash_and_insert(key, &author, data)?;
                expected.insert(hash);
            }
        }
        assert_eq!(expected.len(), n_replicas * n_entries);
        let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
        assert_eq!(actual, expected);
        Ok(())
    }
    #[test]
    fn test_multikey() {
        let mut rng = rand::thread_rng();
        let k = ["a", "c", "z"];
        let mut n: Vec<_> = (0..3).map(|_| NamespaceSecret::new(&mut rng)).collect();
        n.sort_by_key(|n| n.id());
        let mut a: Vec<_> = (0..3).map(|_| Author::new(&mut rng)).collect();
        a.sort_by_key(|a| a.id());
        {
            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
            let ri1 = RecordIdentifier::new(n[0].id(), a[0].id(), k[1]);
            let ri2 = RecordIdentifier::new(n[0].id(), a[0].id(), k[2]);
            let range = Range::new(ri0.clone(), ri2.clone());
            assert!(range.contains(&ri0), "start");
            assert!(range.contains(&ri1), "inside");
            assert!(!range.contains(&ri2), "end");
            assert!(ri0 < ri1);
            assert!(ri1 < ri2);
        }
        {
            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
            let range = Range::new(ri0.clone(), ri2.clone());
            assert!(range.contains(&ri0), "start");
            assert!(range.contains(&ri1), "inside");
            assert!(!range.contains(&ri2), "end");
            assert!(ri0 < ri1);
            assert!(ri1 < ri2);
        }
        {
            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
            let ri1 = RecordIdentifier::new(n[0].id(), a[1].id(), k[0]);
            let ri2 = RecordIdentifier::new(n[0].id(), a[2].id(), k[0]);
            let range = Range::new(ri0.clone(), ri2.clone());
            assert!(range.contains(&ri0), "start");
            assert!(range.contains(&ri1), "inside");
            assert!(!range.contains(&ri2), "end");
            assert!(ri0 < ri1);
            assert!(ri1 < ri2);
        }
        {
            let ri0 = RecordIdentifier::new(n[0].id(), a[0].id(), k[0]);
            let ri1 = RecordIdentifier::new(n[1].id(), a[0].id(), k[1]);
            let ri2 = RecordIdentifier::new(n[2].id(), a[0].id(), k[2]);
            let range = Range::new(ri0.clone(), ri2.clone());
            assert!(range.contains(&ri0), "start");
            assert!(range.contains(&ri1), "inside");
            assert!(!range.contains(&ri2), "end");
            assert!(ri0 < ri1);
            assert!(ri1 < ri2);
        }
        {
            let a0 = a[0].id();
            let a1 = a[1].id();
            let n0 = n[0].id();
            let n1 = n[1].id();
            let k0 = k[0];
            let k1 = k[1];
            assert!(RecordIdentifier::new(n0, a0, k0) < RecordIdentifier::new(n1, a1, k1));
            assert!(RecordIdentifier::new(n0, a0, k1) < RecordIdentifier::new(n1, a0, k0));
            assert!(RecordIdentifier::new(n0, a1, k0) < RecordIdentifier::new(n0, a1, k1));
            assert!(RecordIdentifier::new(n1, a1, k0) < RecordIdentifier::new(n1, a1, k1));
        }
    }
    #[test]
    fn test_timestamps_memory() -> Result<()> {
        let store = store::Store::memory();
        test_timestamps(store)?;
        Ok(())
    }
    #[test]
    fn test_timestamps_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_timestamps(store)?;
        Ok(())
    }
    fn test_timestamps(mut store: Store) -> Result<()> {
        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
        let namespace = NamespaceSecret::new(&mut rng);
        let _replica = store.new_replica(namespace.clone())?;
        let author = store.new_author(&mut rng)?;
        store.close_replica(namespace.id());
        let mut replica = store.open_replica(&namespace.id())?;
        let key = b"hello";
        let value = b"world";
        let entry = {
            let timestamp = 2;
            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
            let record = Record::from_data(value, timestamp);
            Entry::new(id, record).sign(&namespace, &author)
        };
        replica
            .insert_entry(entry.clone(), InsertOrigin::Local)
            .unwrap();
        store.close_replica(namespace.id());
        let res = store
            .get_exact(namespace.id(), author.id(), key, false)?
            .unwrap();
        assert_eq!(res, entry);
        let entry2 = {
            let timestamp = 1;
            let id = RecordIdentifier::new(namespace.id(), author.id(), key);
            let record = Record::from_data(value, timestamp);
            Entry::new(id, record).sign(&namespace, &author)
        };
        let mut replica = store.open_replica(&namespace.id())?;
        let res = replica.insert_entry(entry2, InsertOrigin::Local);
        store.close_replica(namespace.id());
        assert!(matches!(res, Err(InsertError::NewerEntryExists)));
        let res = store
            .get_exact(namespace.id(), author.id(), key, false)?
            .unwrap();
        assert_eq!(res, entry);
        Ok(())
    }
    #[test]
    fn test_replica_sync_memory() -> Result<()> {
        let alice_store = store::Store::memory();
        let bob_store = store::Store::memory();
        test_replica_sync(alice_store, bob_store)?;
        Ok(())
    }
    #[test]
    fn test_replica_sync_fs() -> Result<()> {
        let alice_dbfile = tempfile::NamedTempFile::new()?;
        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
        let bob_dbfile = tempfile::NamedTempFile::new()?;
        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
        test_replica_sync(alice_store, bob_store)?;
        Ok(())
    }
    fn test_replica_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
        let alice_set = ["ape", "eel", "fox", "gnu"];
        let bob_set = ["bee", "cat", "doe", "eel", "fox", "hog"];
        let mut rng = rand::thread_rng();
        let author = Author::new(&mut rng);
        let myspace = NamespaceSecret::new(&mut rng);
        let mut alice = alice_store.new_replica(myspace.clone())?;
        for el in &alice_set {
            alice.hash_and_insert(el, &author, el.as_bytes())?;
        }
        let mut bob = bob_store.new_replica(myspace.clone())?;
        for el in &bob_set {
            bob.hash_and_insert(el, &author, el.as_bytes())?;
        }
        let (alice_out, bob_out) = sync(&mut alice, &mut bob)?;
        assert_eq!(alice_out.num_sent, 2);
        assert_eq!(bob_out.num_recv, 2);
        assert_eq!(alice_out.num_recv, 6);
        assert_eq!(bob_out.num_sent, 6);
        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
        Ok(())
    }
    #[test]
    fn test_replica_timestamp_sync_memory() -> Result<()> {
        let alice_store = store::Store::memory();
        let bob_store = store::Store::memory();
        test_replica_timestamp_sync(alice_store, bob_store)?;
        Ok(())
    }
    #[test]
    fn test_replica_timestamp_sync_fs() -> Result<()> {
        let alice_dbfile = tempfile::NamedTempFile::new()?;
        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
        let bob_dbfile = tempfile::NamedTempFile::new()?;
        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
        test_replica_timestamp_sync(alice_store, bob_store)?;
        Ok(())
    }
    fn test_replica_timestamp_sync(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let author = Author::new(&mut rng);
        let namespace = NamespaceSecret::new(&mut rng);
        let mut alice = alice_store.new_replica(namespace.clone())?;
        let mut bob = bob_store.new_replica(namespace.clone())?;
        let key = b"key";
        let alice_value = b"alice";
        let bob_value = b"bob";
        let _alice_hash = alice.hash_and_insert(key, &author, alice_value)?;
        let bob_hash = bob.hash_and_insert(key, &author, bob_value)?;
        sync(&mut alice, &mut bob)?;
        assert_eq!(
            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
            Some(bob_hash)
        );
        assert_eq!(
            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
            Some(bob_hash)
        );
        let mut alice = alice_store.new_replica(namespace.clone())?;
        let mut bob = bob_store.new_replica(namespace.clone())?;
        let alice_value_2 = b"alice2";
        let _bob_hash_2 = bob.hash_and_insert(key, &author, bob_value)?;
        let alice_hash_2 = alice.hash_and_insert(key, &author, alice_value_2)?;
        sync(&mut alice, &mut bob)?;
        assert_eq!(
            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
            Some(alice_hash_2)
        );
        assert_eq!(
            get_content_hash(&mut alice_store, namespace.id(), author.id(), key)?,
            Some(alice_hash_2)
        );
        Ok(())
    }
    #[test]
    fn test_future_timestamp() -> Result<()> {
        let mut rng = rand::thread_rng();
        let mut store = store::Store::memory();
        let author = Author::new(&mut rng);
        let namespace = NamespaceSecret::new(&mut rng);
        let mut replica = store.new_replica(namespace.clone())?;
        let key = b"hi";
        let t = system_time_now();
        let record = Record::from_data(b"1", t);
        let entry0 = SignedEntry::from_parts(&namespace, &author, key, record);
        replica.insert_entry(entry0.clone(), InsertOrigin::Local)?;
        assert_eq!(
            get_entry(&mut store, namespace.id(), author.id(), key)?,
            entry0
        );
        let mut replica = store.new_replica(namespace.clone())?;
        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT - 10000;
        let record = Record::from_data(b"2", t);
        let entry1 = SignedEntry::from_parts(&namespace, &author, key, record);
        replica.insert_entry(entry1.clone(), InsertOrigin::Local)?;
        assert_eq!(
            get_entry(&mut store, namespace.id(), author.id(), key)?,
            entry1
        );
        let mut replica = store.new_replica(namespace.clone())?;
        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT;
        let record = Record::from_data(b"2", t);
        let entry2 = SignedEntry::from_parts(&namespace, &author, key, record);
        replica.insert_entry(entry2.clone(), InsertOrigin::Local)?;
        assert_eq!(
            get_entry(&mut store, namespace.id(), author.id(), key)?,
            entry2
        );
        let mut replica = store.new_replica(namespace.clone())?;
        let t = system_time_now() + MAX_TIMESTAMP_FUTURE_SHIFT + 10000;
        let record = Record::from_data(b"2", t);
        let entry3 = SignedEntry::from_parts(&namespace, &author, key, record);
        let res = replica.insert_entry(entry3, InsertOrigin::Local);
        assert!(matches!(
            res,
            Err(InsertError::Validation(
                ValidationFailure::TooFarInTheFuture
            ))
        ));
        assert_eq!(
            get_entry(&mut store, namespace.id(), author.id(), key)?,
            entry2
        );
        Ok(())
    }
    #[test]
    fn test_insert_empty() -> Result<()> {
        let mut store = store::Store::memory();
        let mut rng = rand::thread_rng();
        let alice = Author::new(&mut rng);
        let myspace = NamespaceSecret::new(&mut rng);
        let mut replica = store.new_replica(myspace.clone())?;
        let hash = Hash::new(b"");
        let res = replica.insert(b"foo", &alice, hash, 0);
        assert!(matches!(res, Err(InsertError::EntryIsEmpty)));
        Ok(())
    }
    #[test]
    fn test_prefix_delete_memory() -> Result<()> {
        let store = store::Store::memory();
        test_prefix_delete(store)?;
        Ok(())
    }
    #[test]
    fn test_prefix_delete_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_prefix_delete(store)?;
        Ok(())
    }
    fn test_prefix_delete(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let alice = Author::new(&mut rng);
        let myspace = NamespaceSecret::new(&mut rng);
        let mut replica = store.new_replica(myspace.clone())?;
        let hash1 = replica.hash_and_insert(b"foobar", &alice, b"hello")?;
        let hash2 = replica.hash_and_insert(b"fooboo", &alice, b"world")?;
        assert_eq!(
            get_content_hash(&mut store, myspace.id(), alice.id(), b"foobar")?,
            Some(hash1)
        );
        assert_eq!(
            get_content_hash(&mut store, myspace.id(), alice.id(), b"fooboo")?,
            Some(hash2)
        );
        let mut replica = store.new_replica(myspace.clone())?;
        let deleted = replica.delete_prefix(b"foo", &alice)?;
        assert_eq!(deleted, 2);
        assert_eq!(
            store.get_exact(myspace.id(), alice.id(), b"foobar", false)?,
            None
        );
        assert_eq!(
            store.get_exact(myspace.id(), alice.id(), b"fooboo", false)?,
            None
        );
        assert_eq!(
            store.get_exact(myspace.id(), alice.id(), b"foo", false)?,
            None
        );
        Ok(())
    }
    #[test]
    fn test_replica_sync_delete_memory() -> Result<()> {
        let alice_store = store::Store::memory();
        let bob_store = store::Store::memory();
        test_replica_sync_delete(alice_store, bob_store)
    }
    #[test]
    fn test_replica_sync_delete_fs() -> Result<()> {
        let alice_dbfile = tempfile::NamedTempFile::new()?;
        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
        let bob_dbfile = tempfile::NamedTempFile::new()?;
        let bob_store = store::fs::Store::persistent(bob_dbfile.path())?;
        test_replica_sync_delete(alice_store, bob_store)
    }
    fn test_replica_sync_delete(mut alice_store: Store, mut bob_store: Store) -> Result<()> {
        let alice_set = ["foot"];
        let bob_set = ["fool", "foo", "fog"];
        let mut rng = rand::thread_rng();
        let author = Author::new(&mut rng);
        let myspace = NamespaceSecret::new(&mut rng);
        let mut alice = alice_store.new_replica(myspace.clone())?;
        for el in &alice_set {
            alice.hash_and_insert(el, &author, el.as_bytes())?;
        }
        let mut bob = bob_store.new_replica(myspace.clone())?;
        for el in &bob_set {
            bob.hash_and_insert(el, &author, el.as_bytes())?;
        }
        sync(&mut alice, &mut bob)?;
        check_entries(&mut alice_store, &myspace.id(), &author, &alice_set)?;
        check_entries(&mut alice_store, &myspace.id(), &author, &bob_set)?;
        check_entries(&mut bob_store, &myspace.id(), &author, &alice_set)?;
        check_entries(&mut bob_store, &myspace.id(), &author, &bob_set)?;
        let mut alice = alice_store.new_replica(myspace.clone())?;
        let mut bob = bob_store.new_replica(myspace.clone())?;
        alice.delete_prefix("foo", &author)?;
        bob.hash_and_insert("fooz", &author, "fooz".as_bytes())?;
        sync(&mut alice, &mut bob)?;
        check_entries(&mut alice_store, &myspace.id(), &author, &["fog", "fooz"])?;
        check_entries(&mut bob_store, &myspace.id(), &author, &["fog", "fooz"])?;
        Ok(())
    }
    #[test]
    fn test_replica_remove_memory() -> Result<()> {
        let alice_store = store::Store::memory();
        test_replica_remove(alice_store)
    }
    #[test]
    fn test_replica_remove_fs() -> Result<()> {
        let alice_dbfile = tempfile::NamedTempFile::new()?;
        let alice_store = store::fs::Store::persistent(alice_dbfile.path())?;
        test_replica_remove(alice_store)
    }
    fn test_replica_remove(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let namespace = NamespaceSecret::new(&mut rng);
        let author = Author::new(&mut rng);
        let mut replica = store.new_replica(namespace.clone())?;
        let hash = replica.hash_and_insert(b"foo", &author, b"bar")?;
        let res = store
            .get_many(namespace.id(), Query::all())?
            .collect::<Vec<_>>();
        assert_eq!(res.len(), 1);
        let res = store.remove_replica(&namespace.id());
        assert!(res.is_err());
        store.close_replica(namespace.id());
        store.remove_replica(&namespace.id())?;
        let res = store
            .get_many(namespace.id(), Query::all())?
            .collect::<Vec<_>>();
        assert_eq!(res.len(), 0);
        let res = store.load_replica_info(&namespace.id());
        assert!(matches!(res, Err(OpenError::NotFound)));
        let mut replica = store.new_replica(namespace.clone())?;
        replica.insert(b"foo", &author, hash, 3)?;
        let res = store
            .get_many(namespace.id(), Query::all())?
            .collect::<Vec<_>>();
        assert_eq!(res.len(), 1);
        Ok(())
    }
    #[test]
    fn test_replica_delete_edge_cases_memory() -> Result<()> {
        let store = store::Store::memory();
        test_replica_delete_edge_cases(store)
    }
    #[test]
    fn test_replica_delete_edge_cases_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_replica_delete_edge_cases(store)
    }
    fn test_replica_delete_edge_cases(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let author = Author::new(&mut rng);
        let namespace = NamespaceSecret::new(&mut rng);
        let edgecases = [0u8, 1u8, 255u8];
        let prefixes = [0u8, 255u8];
        let hash = Hash::new(b"foo");
        let len = 3;
        for prefix in prefixes {
            let mut expected = vec![];
            let mut replica = store.new_replica(namespace.clone())?;
            for suffix in edgecases {
                let key = [prefix, suffix].to_vec();
                expected.push(key.clone());
                replica.insert(&key, &author, hash, len)?;
            }
            assert_keys(&mut store, namespace.id(), expected);
            let mut replica = store.new_replica(namespace.clone())?;
            replica.delete_prefix([prefix], &author)?;
            assert_keys(&mut store, namespace.id(), vec![]);
        }
        let mut replica = store.new_replica(namespace.clone())?;
        let key = vec![1u8, 0u8];
        replica.insert(key, &author, hash, len)?;
        let key = vec![1u8, 1u8];
        replica.insert(key, &author, hash, len)?;
        let key = vec![1u8, 2u8];
        replica.insert(key, &author, hash, len)?;
        let prefix = vec![1u8, 1u8];
        replica.delete_prefix(prefix, &author)?;
        assert_keys(
            &mut store,
            namespace.id(),
            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
        );
        let mut replica = store.new_replica(namespace.clone())?;
        let key = vec![0u8, 255u8];
        replica.insert(key, &author, hash, len)?;
        let key = vec![0u8, 0u8];
        replica.insert(key, &author, hash, len)?;
        let prefix = vec![0u8];
        replica.delete_prefix(prefix, &author)?;
        assert_keys(
            &mut store,
            namespace.id(),
            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
        );
        Ok(())
    }
    #[test]
    fn test_latest_iter_memory() -> Result<()> {
        let store = store::Store::memory();
        test_latest_iter(store)
    }
    #[test]
    fn test_latest_iter_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_latest_iter(store)
    }
    fn test_latest_iter(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let author0 = Author::new(&mut rng);
        let author1 = Author::new(&mut rng);
        let namespace = NamespaceSecret::new(&mut rng);
        let mut replica = store.new_replica(namespace.clone())?;
        replica.hash_and_insert(b"a0.1", &author0, b"hi")?;
        let latest = store
            .get_latest_for_each_author(namespace.id())?
            .collect::<Result<Vec<_>>>()?;
        assert_eq!(latest.len(), 1);
        assert_eq!(latest[0].2, b"a0.1".to_vec());
        let mut replica = store.new_replica(namespace.clone())?;
        replica.hash_and_insert(b"a1.1", &author1, b"hi")?;
        replica.hash_and_insert(b"a0.2", &author0, b"hi")?;
        let latest = store
            .get_latest_for_each_author(namespace.id())?
            .collect::<Result<Vec<_>>>()?;
        let mut latest_keys: Vec<Vec<u8>> = latest.iter().map(|r| r.2.to_vec()).collect();
        latest_keys.sort();
        assert_eq!(latest_keys, vec![b"a0.2".to_vec(), b"a1.1".to_vec()]);
        Ok(())
    }
    #[test]
    fn test_replica_byte_keys_memory() -> Result<()> {
        let store = store::Store::memory();
        test_replica_byte_keys(store)?;
        Ok(())
    }
    #[test]
    fn test_replica_byte_keys_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_replica_byte_keys(store)?;
        Ok(())
    }
    fn test_replica_byte_keys(mut store: Store) -> Result<()> {
        let mut rng = rand::thread_rng();
        let author = Author::new(&mut rng);
        let namespace = NamespaceSecret::new(&mut rng);
        let hash = Hash::new(b"foo");
        let len = 3;
        let key = vec![1u8, 0u8];
        let mut replica = store.new_replica(namespace.clone())?;
        replica.insert(key, &author, hash, len)?;
        assert_keys(&mut store, namespace.id(), vec![vec![1u8, 0u8]]);
        let key = vec![1u8, 2u8];
        let mut replica = store.new_replica(namespace.clone())?;
        replica.insert(key, &author, hash, len)?;
        assert_keys(
            &mut store,
            namespace.id(),
            vec![vec![1u8, 0u8], vec![1u8, 2u8]],
        );
        let key = vec![0u8, 255u8];
        let mut replica = store.new_replica(namespace.clone())?;
        replica.insert(key, &author, hash, len)?;
        assert_keys(
            &mut store,
            namespace.id(),
            vec![vec![1u8, 0u8], vec![1u8, 2u8], vec![0u8, 255u8]],
        );
        Ok(())
    }
    #[test]
    fn test_replica_capability_memory() -> Result<()> {
        let store = store::Store::memory();
        test_replica_capability(store)
    }
    #[test]
    fn test_replica_capability_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_replica_capability(store)
    }
    #[allow(clippy::redundant_pattern_matching)]
    fn test_replica_capability(mut store: Store) -> Result<()> {
        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
        let author = store.new_author(&mut rng)?;
        let namespace = NamespaceSecret::new(&mut rng);
        let capability = Capability::Read(namespace.id());
        store.import_namespace(capability)?;
        let mut replica = store.open_replica(&namespace.id())?;
        let res = replica.hash_and_insert(b"foo", &author, b"bar");
        assert!(matches!(res, Err(InsertError::ReadOnly)));
        let capability = Capability::Write(namespace.clone());
        store.import_namespace(capability)?;
        let mut replica = store.open_replica(&namespace.id())?;
        let res = replica.hash_and_insert(b"foo", &author, b"bar");
        assert!(matches!(res, Ok(_)));
        store.close_replica(namespace.id());
        let mut replica = store.open_replica(&namespace.id())?;
        let res = replica.hash_and_insert(b"foo", &author, b"bar");
        assert!(res.is_ok());
        let capability = Capability::Read(namespace.id());
        store.import_namespace(capability)?;
        store.close_replica(namespace.id());
        let mut replica = store.open_replica(&namespace.id())?;
        let res = replica.hash_and_insert(b"foo", &author, b"bar");
        assert!(res.is_ok());
        Ok(())
    }
    #[tokio::test]
    async fn test_actor_capability_memory() -> Result<()> {
        let store = store::Store::memory();
        test_actor_capability(store).await
    }
    #[tokio::test]
    async fn test_actor_capability_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_actor_capability(store).await
    }
    async fn test_actor_capability(store: Store) -> Result<()> {
        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
        let author = Author::new(&mut rng);
        let handle = SyncHandle::spawn(store, None, "test".into());
        let author = handle.import_author(author).await?;
        let namespace = NamespaceSecret::new(&mut rng);
        let id = namespace.id();
        let capability = Capability::Read(namespace.id());
        handle.import_namespace(capability).await?;
        handle.open(namespace.id(), Default::default()).await?;
        let res = handle
            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
            .await;
        assert!(res.is_err());
        let capability = Capability::Write(namespace.clone());
        handle.import_namespace(capability).await?;
        let res = handle
            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
            .await;
        assert!(res.is_ok());
        handle.close(namespace.id()).await?;
        let res = handle
            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
            .await;
        assert!(res.is_err());
        handle.open(namespace.id(), Default::default()).await?;
        let res = handle
            .insert_local(id, author, b"foo".to_vec().into(), Hash::new(b"bar"), 3)
            .await;
        assert!(res.is_ok());
        Ok(())
    }
    fn drain(events: async_channel::Receiver<Event>) -> Vec<Event> {
        let mut res = vec![];
        while let Ok(ev) = events.try_recv() {
            res.push(ev);
        }
        res
    }
    #[test]
    fn test_replica_no_wrong_remote_insert_events() -> Result<()> {
        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
        let mut store1 = store::Store::memory();
        let mut store2 = store::Store::memory();
        let peer1 = [1u8; 32];
        let peer2 = [2u8; 32];
        let mut state1 = SyncOutcome::default();
        let mut state2 = SyncOutcome::default();
        let author = Author::new(&mut rng);
        let namespace = NamespaceSecret::new(&mut rng);
        let mut replica1 = store1.new_replica(namespace.clone())?;
        let mut replica2 = store2.new_replica(namespace.clone())?;
        let (events1_sender, events1) = async_channel::bounded(32);
        let (events2_sender, events2) = async_channel::bounded(32);
        replica1.info.subscribe(events1_sender);
        replica2.info.subscribe(events2_sender);
        replica1.hash_and_insert(b"foo", &author, b"init")?;
        let from1 = replica1.sync_initial_message()?;
        let from2 = replica2
            .sync_process_message(from1, peer1, &mut state2)
            .unwrap()
            .unwrap();
        let from1 = replica1
            .sync_process_message(from2, peer2, &mut state1)
            .unwrap()
            .unwrap();
        replica2.hash_and_insert(b"foo", &author, b"update")?;
        let from2 = replica2
            .sync_process_message(from1, peer1, &mut state2)
            .unwrap();
        assert!(from2.is_none());
        let events1 = drain(events1);
        let events2 = drain(events2);
        assert_eq!(events1.len(), 1);
        assert_eq!(events2.len(), 1);
        assert!(matches!(events1[0], Event::LocalInsert { .. }));
        assert!(matches!(events2[0], Event::LocalInsert { .. }));
        assert_eq!(state1.num_sent, 1);
        assert_eq!(state1.num_recv, 0);
        assert_eq!(state2.num_sent, 0);
        assert_eq!(state2.num_recv, 1);
        Ok(())
    }
    #[test]
    fn test_replica_queries_mem() -> Result<()> {
        let store = store::Store::memory();
        test_replica_queries(store)?;
        Ok(())
    }
    #[test]
    fn test_replica_queries_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let store = store::fs::Store::persistent(dbfile.path())?;
        test_replica_queries(store)?;
        Ok(())
    }
    fn test_replica_queries(mut store: Store) -> Result<()> {
        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
        let namespace = NamespaceSecret::new(&mut rng);
        let namespace_id = namespace.id();
        let a1 = store.new_author(&mut rng)?;
        let a2 = store.new_author(&mut rng)?;
        let a3 = store.new_author(&mut rng)?;
        println!(
            "a1 {} a2 {} a3 {}",
            a1.id().fmt_short(),
            a2.id().fmt_short(),
            a3.id().fmt_short()
        );
        let mut replica = store.new_replica(namespace.clone())?;
        replica.hash_and_insert("hi/world", &a2, "a2")?;
        replica.hash_and_insert("hi/world", &a1, "a1")?;
        replica.hash_and_insert("hi/moon", &a2, "a1")?;
        replica.hash_and_insert("hi", &a3, "a3")?;
        struct QueryTester<'a> {
            store: &'a mut Store,
            namespace: NamespaceId,
        }
        impl<'a> QueryTester<'a> {
            fn assert(&mut self, query: impl Into<Query>, expected: Vec<(&'static str, &Author)>) {
                let query = query.into();
                let actual = self
                    .store
                    .get_many(self.namespace, query.clone())
                    .unwrap()
                    .map(|e| e.map(|e| (String::from_utf8(e.key().to_vec()).unwrap(), e.author())))
                    .collect::<Result<Vec<_>>>()
                    .unwrap();
                let expected = expected
                    .into_iter()
                    .map(|(key, author)| (key.to_string(), author.id()))
                    .collect::<Vec<_>>();
                assert_eq!(actual, expected, "query: {query:#?}")
            }
        }
        let mut qt = QueryTester {
            store: &mut store,
            namespace: namespace_id,
        };
        qt.assert(
            Query::all(),
            vec![
                ("hi/world", &a1),
                ("hi/moon", &a2),
                ("hi/world", &a2),
                ("hi", &a3),
            ],
        );
        qt.assert(
            Query::single_latest_per_key(),
            vec![("hi", &a3), ("hi/moon", &a2), ("hi/world", &a1)],
        );
        qt.assert(
            Query::single_latest_per_key().sort_direction(SortDirection::Desc),
            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
        );
        qt.assert(
            Query::single_latest_per_key().key_prefix("hi/"),
            vec![("hi/moon", &a2), ("hi/world", &a1)],
        );
        qt.assert(
            Query::single_latest_per_key()
                .key_prefix("hi/")
                .sort_direction(SortDirection::Desc),
            vec![("hi/world", &a1), ("hi/moon", &a2)],
        );
        qt.assert(
            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Asc),
            vec![
                ("hi", &a3),
                ("hi/moon", &a2),
                ("hi/world", &a1),
                ("hi/world", &a2),
            ],
        );
        qt.assert(
            Query::all().sort_by(SortBy::KeyAuthor, SortDirection::Desc),
            vec![
                ("hi/world", &a2),
                ("hi/world", &a1),
                ("hi/moon", &a2),
                ("hi", &a3),
            ],
        );
        qt.assert(
            Query::all().key_prefix("hi/"),
            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
        );
        qt.assert(
            Query::all().key_prefix("hi/").offset(1).limit(1),
            vec![("hi/moon", &a2)],
        );
        qt.assert(
            Query::all()
                .key_prefix("hi/")
                .sort_by(SortBy::KeyAuthor, SortDirection::Desc),
            vec![("hi/world", &a2), ("hi/world", &a1), ("hi/moon", &a2)],
        );
        qt.assert(
            Query::all()
                .key_prefix("hi/")
                .sort_by(SortBy::KeyAuthor, SortDirection::Desc)
                .offset(1)
                .limit(1),
            vec![("hi/world", &a1)],
        );
        qt.assert(
            Query::all()
                .key_prefix("hi/")
                .sort_by(SortBy::AuthorKey, SortDirection::Asc),
            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi/world", &a2)],
        );
        qt.assert(
            Query::all()
                .key_prefix("hi/")
                .sort_by(SortBy::AuthorKey, SortDirection::Desc),
            vec![("hi/world", &a2), ("hi/moon", &a2), ("hi/world", &a1)],
        );
        qt.assert(
            Query::all()
                .sort_by(SortBy::KeyAuthor, SortDirection::Asc)
                .limit(2)
                .offset(1),
            vec![("hi/moon", &a2), ("hi/world", &a1)],
        );
        let mut replica = store.new_replica(namespace)?;
        replica.delete_prefix("hi/world", &a2)?;
        let mut qt = QueryTester {
            store: &mut store,
            namespace: namespace_id,
        };
        qt.assert(
            Query::all(),
            vec![("hi/world", &a1), ("hi/moon", &a2), ("hi", &a3)],
        );
        qt.assert(
            Query::all().include_empty(),
            vec![
                ("hi/world", &a1),
                ("hi/moon", &a2),
                ("hi/world", &a2),
                ("hi", &a3),
            ],
        );
        Ok(())
    }
    #[test]
    fn test_dl_policies_mem() -> Result<()> {
        let mut store = store::Store::memory();
        test_dl_policies(&mut store)
    }
    #[test]
    fn test_dl_policies_fs() -> Result<()> {
        let dbfile = tempfile::NamedTempFile::new()?;
        let mut store = store::fs::Store::persistent(dbfile.path())?;
        test_dl_policies(&mut store)
    }
    fn test_dl_policies(store: &mut Store) -> Result<()> {
        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
        let namespace = NamespaceSecret::new(&mut rng);
        let id = namespace.id();
        let filter = store::FilterKind::Exact("foo".into());
        let policy = store::DownloadPolicy::NothingExcept(vec![filter]);
        store
            .set_download_policy(&id, policy.clone())
            .expect_err("document dos not exist");
        store.new_replica(namespace)?;
        store.set_download_policy(&id, policy.clone())?;
        let retrieved_policy = store.get_download_policy(&id)?;
        assert_eq!(retrieved_policy, policy);
        Ok(())
    }
    fn assert_keys(store: &mut Store, namespace: NamespaceId, mut expected: Vec<Vec<u8>>) {
        expected.sort();
        assert_eq!(expected, get_keys_sorted(store, namespace));
    }
    fn get_keys_sorted(store: &mut Store, namespace: NamespaceId) -> Vec<Vec<u8>> {
        let mut res = store
            .get_many(namespace, Query::all())
            .unwrap()
            .map(|e| e.map(|e| e.key().to_vec()))
            .collect::<Result<Vec<_>>>()
            .unwrap();
        res.sort();
        res
    }
    fn get_entry(
        store: &mut Store,
        namespace: NamespaceId,
        author: AuthorId,
        key: &[u8],
    ) -> anyhow::Result<SignedEntry> {
        let entry = store
            .get_exact(namespace, author, key, true)?
            .ok_or_else(|| anyhow::anyhow!("not found"))?;
        Ok(entry)
    }
    fn get_content_hash(
        store: &mut Store,
        namespace: NamespaceId,
        author: AuthorId,
        key: &[u8],
    ) -> anyhow::Result<Option<Hash>> {
        let hash = store
            .get_exact(namespace, author, key, false)?
            .map(|e| e.content_hash());
        Ok(hash)
    }
    fn sync(alice: &mut Replica, bob: &mut Replica) -> Result<(SyncOutcome, SyncOutcome)> {
        let alice_peer_id = [1u8; 32];
        let bob_peer_id = [2u8; 32];
        let mut alice_state = SyncOutcome::default();
        let mut bob_state = SyncOutcome::default();
        let mut next_to_bob = Some(alice.sync_initial_message()?);
        let mut rounds = 0;
        while let Some(msg) = next_to_bob.take() {
            assert!(rounds < 100, "too many rounds");
            rounds += 1;
            println!("round {}", rounds);
            if let Some(msg) = bob.sync_process_message(msg, alice_peer_id, &mut bob_state)? {
                next_to_bob = alice.sync_process_message(msg, bob_peer_id, &mut alice_state)?
            }
        }
        assert_eq!(alice_state.num_sent, bob_state.num_recv);
        assert_eq!(alice_state.num_recv, bob_state.num_sent);
        Ok((alice_state, bob_state))
    }
    fn check_entries(
        store: &mut Store,
        namespace: &NamespaceId,
        author: &Author,
        set: &[&str],
    ) -> Result<()> {
        for el in set {
            store.get_exact(*namespace, author.id(), el, false)?;
        }
        Ok(())
    }
}