native_db-32bit 0.5.4

Drop-in embedded database
Documentation
use crate::db_type::{
    DatabaseSecondaryKeyOptions, Error, InnerKeyValue, Input, KeyDefinition, Result,
};
use crate::watch;
use crate::watch::{MpscReceiver, TableFilter};
#[cfg(not(target_has_atomic = "64"))]
use portable_atomic::AtomicU64;
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, RwLock};

pub(crate) struct InternalWatch<'db> {
    pub(crate) watchers: &'db Arc<RwLock<watch::Watchers>>,
    pub(crate) watchers_counter_id: &'db AtomicU64,
}

impl InternalWatch<'_> {
    fn watch_generic(
        &self,
        table_filter: watch::TableFilter,
    ) -> Result<(MpscReceiver<watch::Event>, u64)> {
        #[cfg(not(feature = "tokio"))]
        let (event_sender, event_receiver) = std::sync::mpsc::channel();
        #[cfg(feature = "tokio")]
        let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
        let event_sender = Arc::new(Mutex::new(event_sender));
        let id = self.generate_watcher_id()?;
        let mut watchers = self.watchers.write().unwrap();
        watchers.add_sender(id, &table_filter, Arc::clone(&event_sender));
        drop(watchers);
        Ok((event_receiver, id))
    }

    fn generate_watcher_id(&self) -> Result<u64> {
        #[cfg(not(target_has_atomic = "64"))]
        let value = self
            .watchers_counter_id
            .fetch_add(1, portable_atomic::Ordering::SeqCst);
        #[cfg(target_has_atomic = "64")]
        let value = self
            .watchers_counter_id
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        if value == u64::MAX {
            Err(Error::MaxWatcherReached.into())
        } else {
            Ok(value)
        }
    }

    pub(crate) fn watch_primary<T: Input>(
        &self,
        key: impl InnerKeyValue,
    ) -> Result<(MpscReceiver<watch::Event>, u64)> {
        let table_name = T::native_db_model().primary_key;
        let key = key.database_inner_key_value();
        let table_filter =
            TableFilter::new_primary(table_name.unique_table_name.clone(), Some(key));
        self.watch_generic(table_filter)
    }

    pub(crate) fn watch_primary_all<T: Input>(&self) -> Result<(MpscReceiver<watch::Event>, u64)> {
        let table_name = T::native_db_model().primary_key;
        let table_filter = TableFilter::new_primary(table_name.unique_table_name.clone(), None);
        self.watch_generic(table_filter)
    }

    pub(crate) fn watch_primary_start_with<T: Input>(
        &self,
        start_with: impl InnerKeyValue,
    ) -> Result<(MpscReceiver<watch::Event>, u64)> {
        let table_name = T::native_db_model().primary_key;
        let start_with = start_with.database_inner_key_value();
        let table_filter =
            TableFilter::new_primary_start_with(table_name.unique_table_name.clone(), start_with);
        self.watch_generic(table_filter)
    }

    pub(crate) fn watch_secondary<T: Input>(
        &self,
        key_def: &impl KeyDefinition<DatabaseSecondaryKeyOptions>,
        key: impl InnerKeyValue,
    ) -> Result<(MpscReceiver<watch::Event>, u64)> {
        let table_name = T::native_db_model().primary_key;
        let key = key.database_inner_key_value();
        let table_filter =
            TableFilter::new_secondary(table_name.unique_table_name.clone(), key_def, Some(key));
        self.watch_generic(table_filter)
    }

    pub(crate) fn watch_secondary_all<T: Input>(
        &self,
        key_def: &impl KeyDefinition<DatabaseSecondaryKeyOptions>,
    ) -> Result<(MpscReceiver<watch::Event>, u64)> {
        let table_name = T::native_db_model().primary_key;
        let table_filter =
            TableFilter::new_secondary(table_name.unique_table_name.clone(), key_def, None);
        self.watch_generic(table_filter)
    }

    pub(crate) fn watch_secondary_start_with<T: Input>(
        &self,
        key_def: &impl KeyDefinition<DatabaseSecondaryKeyOptions>,
        start_with: impl InnerKeyValue,
    ) -> Result<(MpscReceiver<watch::Event>, u64)> {
        let table_name = T::native_db_model().primary_key;
        let start_with = start_with.database_inner_key_value();
        let table_filter = TableFilter::new_secondary_start_with(
            table_name.unique_table_name.clone(),
            key_def,
            start_with,
        );
        self.watch_generic(table_filter)
    }
}