use crate::error::JasonError;
use crate::sources::Source;
use crate::Database;
use humphrey_json::prelude::*;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{spawn, JoinHandle};
pub trait Replica<T>: Send + 'static {
fn set(&mut self, key: &str, value: &str) -> Result<(), JasonError>;
}
pub(crate) enum Replicator<T> {
Sync(Box<dyn Replica<T> + Send>),
Async {
thread: Option<JoinHandle<()>>,
sender: Sender<ReplicationMessage>,
},
}
pub(crate) enum ReplicationMessage {
Replicate(String, String),
Shutdown,
}
impl<T> Replicator<T>
where
T: 'static,
{
pub fn new<R>(replica: R) -> Self
where
R: Replica<T>,
{
Self::Sync(Box::new(replica))
}
pub fn new_async<R>(mut replica: R) -> Self
where
R: Replica<T>,
{
let (tx, rx): (Sender<ReplicationMessage>, Receiver<ReplicationMessage>) = channel();
let handle = spawn(move || {
for msg in rx {
match msg {
ReplicationMessage::Replicate(key, value) => {
replica.set(&key, &value).unwrap();
}
ReplicationMessage::Shutdown => {
break;
}
}
}
});
Self::Async {
thread: Some(handle),
sender: tx,
}
}
pub fn set(&mut self, key: &str, value: &str) -> Result<(), JasonError> {
match self {
Self::Sync(replica) => replica.set(key, value),
Self::Async { sender, .. } => {
let msg = ReplicationMessage::Replicate(key.to_string(), value.to_string());
sender.send(msg).map_err(|_| JasonError::ReplicaError)?;
Ok(())
}
}
}
}
impl<T> Drop for Replicator<T> {
fn drop(&mut self) {
match self {
Self::Sync(_) => (),
Self::Async { thread, sender } => {
sender.send(ReplicationMessage::Shutdown).unwrap();
if let Some(thread) = thread.take() {
thread.join().unwrap();
}
}
}
}
}
impl<T, S> Replica<T> for Database<T, S>
where
T: IntoJson + FromJson + Send + 'static,
S: Source + Send + 'static,
{
fn set(&mut self, key: &str, value: &str) -> Result<(), JasonError> {
self.set_raw(key, value.as_bytes())
}
}