use crate::actor::{Actor, ActorContext, Addr};
use crate::message::{Get, Message, Put};
use crate::router::Router;
use crate::types::{Children, NodeData, Value};
use crate::utils::random_string;
use crate::adapters::MemoryStorage;
use async_trait::async_trait;
use log::{debug, info};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use std::time::SystemTime; use tokio::sync::broadcast;
static BROADCAST_CHANNEL_SIZE: usize = 10;
#[derive(Clone)]
pub struct Config {
pub allow_public_space: bool,
pub my_pub: Option<String>,
pub stats: bool,
}
impl Default for Config {
fn default() -> Self {
Config {
allow_public_space: true,
stats: true,
my_pub: None,
}
}
}
#[derive(Clone)]
pub struct Node {
uid: Arc<RwLock<String>>,
path: Vec<String>,
children: Arc<RwLock<BTreeMap<String, Node>>>,
parent: Arc<RwLock<Option<(String, Node)>>>,
on_sender: broadcast::Sender<Value>,
map_sender: broadcast::Sender<(String, Value)>,
actor_context: Box<ActorContext>,
addr: Arc<RwLock<Option<Addr>>>,
router: Arc<RwLock<Option<Addr>>>,
}
#[async_trait]
impl Actor for Node {
async fn handle(&mut self, msg: Message, _context: &ActorContext) {
match msg {
Message::Put(put) => self.handle_put(put),
_ => {}
}
}
}
impl Node {
pub fn new() -> Self {
let storage = MemoryStorage::new();
Self::new_with_config(Config::default(), vec![Box::new(storage)], Vec::new())
}
pub fn id(&self) -> String {
self.uid.read().unwrap().clone()
}
pub fn peer_id(&self) -> String {
self.actor_context.peer_id.read().unwrap().clone()
}
pub fn new_with_config(
config: Config,
storage_adapters: Vec<Box<dyn Actor>>,
network_adapters: Vec<Box<dyn Actor>>,
) -> Self {
let actor_context = ActorContext::new(random_string(16));
let mut node = Self {
path: vec![],
uid: Arc::new(RwLock::new("".to_string())),
children: Arc::new(RwLock::new(BTreeMap::new())),
parent: Arc::new(RwLock::new(None)),
on_sender: broadcast::channel::<Value>(BROADCAST_CHANNEL_SIZE).0,
map_sender: broadcast::channel::<(String, Value)>(BROADCAST_CHANNEL_SIZE).0,
addr: Arc::new(RwLock::new(None)),
router: Arc::new(RwLock::new(None)),
actor_context: Box::new(actor_context),
};
node.actor_context.node = Some(node.clone());
let addr = node.actor_context.start_actor(Box::new(node.clone()));
*node.addr.write().unwrap() = Some(addr);
let router = Box::new(Router::new(config, storage_adapters, network_adapters)); let router_addr = node.actor_context.start_router(router);
*node.router.write().unwrap() = Some(router_addr);
node
}
fn handle_put(&mut self, put: Put) {
for (node_id, node_data) in put.updated_nodes {
if node_id == *self.uid.read().unwrap() {
for (child, child_data) in node_data {
if let Some(child) = self.children.read().unwrap().get(&child) {
let _ = child.on_sender.send(child_data.value.clone());
}
let _ = self
.map_sender
.send((child.to_string(), child_data.value.clone()));
}
}
}
}
fn new_child(&self, key: String) -> Node {
assert!(key.len() > 0, "Key length must be greater than zero");
debug!("new child {}", key);
let mut path = self.path.clone();
path.push(key.clone());
let new_child_uid = path.join("/");
debug!("new_child_uid {}", new_child_uid);
let node = Self {
path,
children: Arc::new(RwLock::new(BTreeMap::new())),
parent: Arc::new(RwLock::new(Some((
self.uid.read().unwrap().clone(),
self.clone(),
)))),
on_sender: broadcast::channel::<Value>(BROADCAST_CHANNEL_SIZE).0,
map_sender: broadcast::channel::<(String, Value)>(BROADCAST_CHANNEL_SIZE).0,
uid: Arc::new(RwLock::new(new_child_uid)),
router: self.router.clone(),
addr: Arc::new(RwLock::new(None)),
actor_context: self.actor_context.clone(),
};
let addr = self.actor_context.start_actor(Box::new(node.clone()));
*node.addr.write().unwrap() = Some(addr);
self.children.write().unwrap().insert(key, node.clone());
node
}
pub fn on(&mut self) -> broadcast::Receiver<Value> {
let key;
if self.path.len() > 1 {
key = self.path.iter().nth(self.path.len() - 1).cloned();
} else {
key = None;
}
let addr;
let node_id;
if let Some((parent_id, parent)) = &*self.parent.read().unwrap() {
node_id = parent_id.clone();
addr = parent.addr.read().unwrap().clone().unwrap();
} else {
node_id = self.uid.read().unwrap().to_string();
addr = self.addr.read().unwrap().clone().unwrap();
}
let get = Get::new(node_id, key, addr);
if let Some(router) = self.router.read().unwrap().clone() {
let _ = router.send(Message::Get(get));
}
self.on_sender.subscribe()
}
pub fn get(&mut self, key: &str) -> Node {
if key == "" {
return self.clone();
}
debug!("get key {}", key);
if self.children.read().unwrap().contains_key(key) {
self.children.read().unwrap().get(key).unwrap().clone() } else {
self.new_child(key.to_string())
}
}
pub fn map(&self) -> broadcast::Receiver<(String, Value)> {
self.map_sender.subscribe()
}
fn add_parent_nodes(
&mut self,
updated_nodes: &mut BTreeMap<String, Children>,
value: Value,
updated_at: f64,
) {
let parent = &*self.parent.read().unwrap();
if let Some((parent_id, parent)) = parent {
if parent_id == "" {
return; }
let mut parent = parent.clone();
let mut children = Children::default();
children.insert(
self.path.last().unwrap().clone(),
NodeData {
value: value.clone(),
updated_at,
},
);
updated_nodes.insert(parent_id.to_string(), children);
parent.add_parent_nodes(updated_nodes, Value::Link(parent.id()), updated_at);
}
}
pub fn put(&mut self, value: Value) {
let updated_at: f64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as f64;
self.on_sender.send(value.clone()).ok();
debug!("put {}", value.to_string());
let mut updated_nodes = BTreeMap::new();
self.add_parent_nodes(&mut updated_nodes, value, updated_at);
let my_addr = self.addr.read().unwrap().clone().unwrap();
let put = Put::new(updated_nodes, None, my_addr);
if let Some(router) = &*self.router.read().unwrap() {
let _ = router.send(Message::Put(put));
}
}
pub fn stop(&mut self) {
info!("Node stopping");
self.actor_context.stop();
}
}