coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
use crate::persistent::journal::storage::JournalStorageRef;
use std::sync::Arc;

pub trait StorageProvider: 'static + Send + Sync {
    fn journal_storage(&self) -> Option<JournalStorageRef>;
}

pub type StorageProviderRef = Arc<dyn StorageProvider>;

pub mod inmemory {
    use crate::persistent::journal::provider::StorageProvider;
    use crate::persistent::journal::storage::{JournalEntry, JournalStorage, JournalStorageRef};
    use parking_lot::RwLock;
    use std::collections::hash_map::Entry;
    use std::collections::HashMap;
    use std::mem;
    use std::sync::Arc;

    #[derive(Debug)]
    struct ActorJournal {
        snapshots: Vec<JournalEntry>,
        messages: Vec<JournalEntry>,
    }

    impl ActorJournal {
        pub fn from_snapshot(entry: JournalEntry) -> ActorJournal {
            ActorJournal {
                snapshots: vec![entry],
                messages: vec![],
            }
        }

        pub fn from_message(entry: JournalEntry) -> ActorJournal {
            ActorJournal {
                snapshots: vec![],
                messages: vec![entry],
            }
        }

        pub fn from_messages(messages: Vec<JournalEntry>) -> ActorJournal {
            ActorJournal {
                snapshots: vec![],
                messages,
            }
        }
    }

    #[derive(Default)]
    pub struct InMemoryJournalStorage {
        store: RwLock<HashMap<String, ActorJournal>>,
    }

    #[derive(Default)]
    pub struct InMemoryStorageProvider {
        store: Arc<InMemoryJournalStorage>,
    }

    impl InMemoryStorageProvider {
        pub fn new() -> InMemoryStorageProvider {
            Self::default()
        }
    }

    impl StorageProvider for InMemoryStorageProvider {
        fn journal_storage(&self) -> Option<JournalStorageRef> {
            Some(self.store.clone())
        }
    }

    #[async_trait]
    impl JournalStorage for InMemoryJournalStorage {
        async fn write_snapshot(
            &self,
            persistence_id: &str,
            entry: JournalEntry,
        ) -> anyhow::Result<()> {
            let mut store = self.store.write();
            if let Some(journal) = store.get_mut(persistence_id) {
                journal.snapshots.push(entry);
            } else {
                store.insert(
                    persistence_id.to_string(),
                    ActorJournal::from_snapshot(entry),
                );
            }

            Ok(())
        }

        async fn write_message(
            &self,
            persistence_id: &str,
            entry: JournalEntry,
        ) -> anyhow::Result<()> {
            let mut store = self.store.write();
            if let Some(journal) = store.get_mut(persistence_id) {
                journal.messages.push(entry);
            } else {
                store.insert(
                    persistence_id.to_string(),
                    ActorJournal::from_message(entry),
                );
            }

            Ok(())
        }

        async fn write_message_batch(
            &self,
            persistence_id: &str,
            entries: Vec<JournalEntry>,
        ) -> anyhow::Result<()> {
            let mut store = self.store.write();
            if let Some(journal) = store.get_mut(persistence_id) {
                let mut entries = entries;
                journal.messages.append(&mut entries);
            } else {
                store.insert(
                    persistence_id.to_string(),
                    ActorJournal::from_messages(entries),
                );
            }

            Ok(())
        }

        async fn read_latest_snapshot(
            &self,
            persistence_id: &str,
        ) -> anyhow::Result<Option<JournalEntry>> {
            let store = self.store.read();
            Ok(store
                .get(persistence_id)
                .and_then(|j| j.snapshots.last().cloned()))
        }

        async fn read_latest_messages(
            &self,
            persistence_id: &str,
            from_sequence: i64,
        ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
            let store = self.store.read();
            Ok(store.get(persistence_id).map(|journal| {
                let messages = match from_sequence {
                    0 => journal.messages.clone(),
                    from_sequence => {
                        let starting_message = journal
                            .messages
                            .iter()
                            .enumerate()
                            .find(|(_index, j)| j.sequence > from_sequence)
                            .map(|(index, _j)| index);

                        if let Some(starting_index) = starting_message {
                            journal.messages[starting_index..].iter().cloned().collect()
                        } else {
                            vec![]
                        }
                    }
                };

                trace!(
                    "storage found {} messages for persistence_id={}, from_sequence={}",
                    messages.len(),
                    persistence_id,
                    from_sequence
                );

                messages
            }))
        }

        async fn read_message(
            &self,
            persistence_id: &str,
            sequence_id: i64,
        ) -> anyhow::Result<Option<JournalEntry>> {
            let mut store = self.store.read();
            let journal = store.get(persistence_id);
            match journal {
                None => Ok(None),
                Some(journal) => Ok(journal
                    .messages
                    .iter()
                    .find(|n| n.sequence == sequence_id)
                    .cloned()),
            }
        }

        async fn read_messages(
            &self,
            persistence_id: &str,
            from_sequence: i64,
            to_sequence: i64,
        ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
            let mut store = self.store.read();
            let journal = store.get(persistence_id);
            match journal {
                None => Ok(None),
                Some(journal) => {
                    if journal.messages.is_empty() {
                        Ok(None)
                    } else {
                        let first_seq = journal.messages.first().map(|m| m.sequence).unwrap();
                        let final_seq = journal.messages.last().map(|m| m.sequence).unwrap();

                        if to_sequence >= final_seq {
                            if first_seq >= from_sequence {
                                Ok(Some(journal.messages.clone()))
                            } else {
                                let starting_message = journal
                                    .messages
                                    .iter()
                                    .enumerate()
                                    .find(|(_index, j)| j.sequence > from_sequence)
                                    .map(|(index, _j)| index);

                                if let Some(starting_index) = starting_message {
                                    Ok(Some(
                                        journal.messages[starting_index..]
                                            .iter()
                                            .cloned()
                                            .collect(),
                                    ))
                                } else {
                                    Ok(Some(vec![]))
                                }
                            }
                        } else if first_seq >= from_sequence {
                            let end_message = journal
                                .messages
                                .iter()
                                .enumerate()
                                .find(|(_index, j)| j.sequence > from_sequence)
                                .map(|(index, _j)| index);

                            if let Some(end_index) = end_message {
                                Ok(Some(
                                    journal.messages[..end_index].iter().cloned().collect(),
                                ))
                            } else {
                                Ok(Some(vec![]))
                            }
                        } else {
                            panic!("todo: this")
                        }
                    }
                }
            }
        }

        async fn delete_messages_to(
            &self,
            persistence_id: &str,
            to_sequence: i64,
        ) -> anyhow::Result<()> {
            let mut store = self.store.write();
            let mut journal = store.entry(persistence_id.to_string());
            if let Entry::Occupied(mut journal) = journal {
                let journal = journal.get_mut();

                fn get_messages_to(
                    to_sequence: i64,
                    journal: &mut ActorJournal,
                ) -> Vec<JournalEntry> {
                    let starting_message = journal
                        .messages
                        .iter()
                        .enumerate()
                        .find(|(_index, j)| j.sequence >= to_sequence)
                        .map(|(index, _j)| index);

                    starting_message.map_or_else(|| vec![], |m| journal.messages.split_off(m))
                }

                let messages = if let Some(newest_msg) = journal.messages.last() {
                    if newest_msg.sequence < to_sequence {
                        vec![]
                    } else {
                        get_messages_to(to_sequence, journal)
                    }
                } else {
                    get_messages_to(to_sequence, journal)
                };

                *journal = ActorJournal {
                    snapshots: mem::take(&mut journal.snapshots),
                    messages,
                };
            }

            Ok(())
        }

        async fn delete_all(&self, persistence_id: &str) -> anyhow::Result<()> {
            let mut store = self.store.write();
            store.remove(persistence_id);
            Ok(())
        }
    }
}