1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::collections::{HashMap, HashSet, BTreeMap};
use crate::message::{Message, Put, Get};
use crate::actor::{Actor, ActorContext};
use crate::Config;
use crate::types::*;
use async_trait::async_trait;
use log::{debug, info};
use std::sync::{Arc, RwLock};
//use tokio::time::{sleep, Duration};
pub struct MemoryStorage {
config: Config,
graph_size_bytes: Arc<RwLock<usize>>,
store: Arc<RwLock<HashMap<String, Children>>>,
}
impl MemoryStorage {
pub fn new(config: Config) -> Self {
MemoryStorage {
config,
graph_size_bytes: Arc::new(RwLock::new(0)),
store: Arc::new(RwLock::new(HashMap::new())), // If we don't want to store everything in memory, this needs to use something like Redis or LevelDB. Or have a FileSystem adapter for persistence and evict the least important stuff from memory when it's full.
}
}
fn update_stats(&self) {
/*
let peer_id = self.node.get_peer_id();
let mut stats = self.node.clone().get("node_stats").get(&peer_id);
let store = self.store.clone();
let graph_size_bytes = self.graph_size_bytes.clone();
tokio::task::spawn(async move {
loop {
let count = store.read().unwrap().len().to_string();
let size = *graph_size_bytes.read().unwrap();
let size = format!("{}B", size_format::SizeFormatterBinary::new(size as u64).to_string());
stats.get("graph_node_count").put(count.into());
stats.get("graph_size_bytes").put(size.into());
sleep(Duration::from_millis(1000)).await;
}
});
*/
}
fn handle_get(&self, get: Get, ctx: &ActorContext) {
if let Some(children) = self.store.read().unwrap().get(&get.node_id).cloned() {
debug!("have {}: {:?}", get.node_id, children);
let reply_with_children = match &get.child_key {
Some(child_key) => { // reply with specific child if it's found
match children.get(child_key) {
Some(child_val) => {
let mut r = BTreeMap::new();
r.insert(child_key.clone(), child_val.clone());
r
},
None => { return; }
}
},
None => children.clone() // reply with all children of this node
};
let mut reply_with_nodes = BTreeMap::new();
reply_with_nodes.insert(get.node_id.clone(), reply_with_children);
let mut recipients = HashSet::new();
recipients.insert(get.from.clone());
let my_addr = ctx.addr.clone();
let put = Put::new(reply_with_nodes, Some(get.id.clone()), my_addr);
let _ = get.from.sender.send(Message::Put(put));
} else {
debug!("have not {}", get.node_id);
}
}
fn handle_put(&self, put: Put, ctx: &ActorContext) {
for (node_id, update_data) in put.updated_nodes.iter().rev() { // return in reverse
debug!("saving k-v {}: {:?}", node_id, update_data);
let mut write = self.store.write().unwrap();
if let Some(children) = write.get_mut(node_id) {
for (child_id, child_data) in update_data {
if let Some(existing) = children.get(child_id) {
if child_data.updated_at >= existing.updated_at {
children.insert(child_id.clone(), child_data.clone());
}
} else {
children.insert(child_id.clone(), child_data.clone());
}
}
} else {
write.insert(node_id.to_string(), update_data.clone());
}
}
}
}
#[async_trait]
impl Actor for MemoryStorage {
async fn pre_start(&mut self, _ctx: &ActorContext) {
info!("MemoryStorage adapter starting");
/*
if self.config.stats {
self.update_stats();
}
*/
}
async fn handle(&mut self, message: Message, ctx: &ActorContext) {
match message {
Message::Get(get) => self.handle_get(get, ctx),
Message::Put(put) => self.handle_put(put, ctx),
_ => {}
}
}
}