use std::collections::{HashSet, BTreeMap};
use crate::message::{Message, Put, Get};
use crate::actor::{Actor, ActorContext};
use crate::Config;
use crate::types::*;
use async_trait::async_trait;
use log::{debug, error, info};
use sled;
macro_rules! unwrap_or_return {
( $e:expr ) => {
match $e {
Ok(x) => x,
Err(e) => {
error!("{}", e);
return;
},
}
}
}
pub struct SledStorage {
store: sled::Db,
}
impl SledStorage {
pub fn new(config: Config) -> Self {
let store = config.sled_config.open().unwrap();
SledStorage {
store, }
}
fn handle_get(&self, get: Get, ctx: &ActorContext) {
let res = unwrap_or_return!(self.store.get(&get.node_id.clone()));
if res.is_none() {
debug!("have not {}", get.node_id); return;
}
let children = res.unwrap();
debug!("have {}: {:?}", get.node_id, children);
let children = unwrap_or_return!(self.store.open_tree(&get.node_id));
let reply_with_children = match &get.child_key {
Some(child_key) => { let child_val = unwrap_or_return!(children.get(child_key));
if child_val.is_none() { debug!("requested child {} not found", child_key); return; }
debug!("requested child {} was found", child_key);
let child_val = unwrap_or_return!(bincode::deserialize::<NodeData>(&child_val.unwrap()));
let mut r = BTreeMap::new();
r.insert(child_key.clone(), child_val);
r
},
None => { let mut r = BTreeMap::new();
for child in children.iter() {
let (k,v) = unwrap_or_return!(child);
let k = unwrap_or_return!(std::str::from_utf8(&k)).to_string();
let v: NodeData = unwrap_or_return!(bincode::deserialize::<NodeData>(&v));
r.insert(k, v);
}
r
}
};
let mut reply_with_nodes = BTreeMap::new();
reply_with_nodes.insert(get.node_id.clone(), reply_with_children);
let mut recipients = HashSet::new();
recipients.insert(get.from.clone());
debug!("direct replying to {}", get.from);
let my_addr = ctx.addr.clone();
let put = Put::new(reply_with_nodes, Some(get.id.clone()), my_addr);
let _ = get.from.sender.send(Message::Put(put));
}
fn handle_put(&self, put: Put) {
for (node_id, update_data) in put.updated_nodes.iter().rev() {
debug!("saving k-v {}: {:?}", node_id, update_data);
let res = unwrap_or_return!(self.store.get(node_id));
if let Some(_children) = res {
let children = unwrap_or_return!(self.store.open_tree(node_id));
for (child_id, child_data) in update_data {
if let Some(existing) = unwrap_or_return!(children.get(child_id)) {
let existing = unwrap_or_return!(bincode::deserialize::<NodeData>(&existing));
if child_data.updated_at >= existing.updated_at {
let _ = children.insert(child_id, bincode::serialize(child_data).unwrap());
}
} else {
let _ = children.insert(child_id, bincode::serialize(child_data).unwrap());
}
}
} else {
let _ = self.store.insert(node_id, vec![1]);
let children = unwrap_or_return!(self.store.open_tree(node_id));
for (child_id, child_data) in update_data {
let _ = children.insert(child_id, bincode::serialize(child_data).unwrap());
}
}
}
}
}
#[async_trait]
impl Actor for SledStorage {
async fn handle(&mut self, message: Message, ctx: &ActorContext) {
debug!("SledStorage incoming message {:?}", message);
match message {
Message::Get(get) => self.handle_get(get, ctx),
Message::Put(put) => self.handle_put(put),
_ => {}
}
}
async fn pre_start(&mut self, _context: &ActorContext) {
info!("SledStorage adapter starting");
}
async fn stopping(&mut self, _context: &ActorContext) {
info!("SledStorage adapter stopping");
}
}