#![allow(non_snake_case)]
use super::memory_book::*;
use crate::{connection::NetResult, error::NetworkError, tweetlog::*};
use lib3h_protocol::{
data_types::{
DirectMessageData, EntryListData, FetchEntryData, FetchEntryResultData, GenericResultData,
GetListData, ProvidedEntryData, QueryEntryData, QueryEntryResultData, StoreEntryAspectData,
},
protocol_client::Lib3hClientProtocol,
protocol_server::Lib3hServerProtocol,
types::AgentPubKey,
};
use holochain_locksmith::{Mutex, RwLock};
use holochain_persistence_api::cas::content::Address;
use lib3h_protocol::types::SpaceHash;
use std::collections::{hash_map::Entry, HashMap, HashSet};
type RequestId = String;
type InMemoryServerMap = HashMap<String, Mutex<InMemoryServer>>;
lazy_static! {
pub(crate) static ref MEMORY_SERVER_MAP: RwLock<InMemoryServerMap> =
RwLock::new(HashMap::new());
}
pub(crate) struct InMemoryServer {
senders: HashMap<ChainId, crossbeam_channel::Sender<Lib3hServerProtocol>>,
senders_by_dna:
HashMap<Address, HashMap<AgentPubKey, crossbeam_channel::Sender<Lib3hServerProtocol>>>,
name: String,
client_count: usize,
authored_book: ChainBook,
stored_book: ChainBook,
trackdna_book: HashSet<ChainId>,
request_book: HashMap<RequestId, ChainId>,
request_count: usize,
log: TweetProxy,
}
impl InMemoryServer {
fn priv_generate_request_id(&mut self) -> String {
self.request_count += 1;
format!("req_{}", self.request_count)
}
fn priv_create_request(&mut self, dna_address: &Address, agent_id: &Address) -> RequestId {
let chain_id = into_chain_id(dna_address, agent_id);
self.priv_create_request_with_chain_id(&chain_id)
}
fn priv_create_request_with_chain_id(&mut self, chain_id: &ChainId) -> RequestId {
let req_id = self.priv_generate_request_id();
self.request_book
.insert(req_id.clone(), chain_id.to_string());
req_id
}
fn priv_check_request(&self, request_id: &RequestId) -> Option<&ChainId> {
self.log.t(&format!(
"---- priv_check_request('{}') in {:?} ?",
request_id,
self.request_book.clone(),
));
self.request_book.get(&request_id.clone())
}
fn priv_request_all_lists(&mut self, dna_address: &Address, agent_id: &AgentPubKey) {
let request_id = self.priv_create_request(dna_address, agent_id);
self.priv_send_one(
dna_address,
agent_id,
Lib3hServerProtocol::HandleGetAuthoringEntryList(GetListData {
request_id,
provider_agent_id: agent_id.clone(),
space_address: SpaceHash::from(dna_address.clone()),
}),
)
.expect("Sending HandleGetAuthoringEntryList failed");
let request_id = self.priv_create_request(dna_address, agent_id);
self.priv_send_one(
dna_address,
agent_id,
Lib3hServerProtocol::HandleGetGossipingEntryList(GetListData {
request_id,
provider_agent_id: agent_id.clone(),
space_address: SpaceHash::from(dna_address.clone()),
}),
)
.expect("Sending HandleGetHoldingEntryList failed");
}
}
impl InMemoryServer {
pub fn new(name: String) -> Self {
Self {
name,
senders: HashMap::new(),
senders_by_dna: HashMap::new(),
client_count: 0,
request_book: HashMap::new(),
authored_book: HashMap::new(),
stored_book: HashMap::new(),
request_count: 0,
trackdna_book: HashSet::new(),
log: TweetProxy::new("memory_server"),
}
}
pub fn clock_in(&mut self) {
self.log
.t(&format!("+++ '{}' clock_in()", self.name.clone()));
self.client_count += 1;
}
pub fn clock_out(&mut self) {
self.log
.t(&format!("--- '{}' clock_out", self.name.clone()));
assert!(self.client_count > 0);
self.client_count -= 1;
if self.client_count == 0 {
self.log
.t(&format!("--- '{}' CLEAR CHANNELS", self.name.clone()));
self.senders.clear();
self.senders_by_dna.clear();
}
}
pub fn register_chain(
&mut self,
dna_address: &Address,
agent_id: &AgentPubKey,
sender: crossbeam_channel::Sender<Lib3hServerProtocol>,
) -> NetResult<()> {
self.senders
.insert(into_chain_id(dna_address, agent_id), sender.clone());
match self.senders_by_dna.entry(dna_address.to_owned()) {
Entry::Occupied(mut e) => {
e.get_mut().insert(agent_id.clone(), sender.clone());
}
Entry::Vacant(e) => {
let mut map = HashMap::new();
map.insert(agent_id.clone(), sender.clone());
e.insert(map);
}
};
Ok(())
}
pub fn unregister_chain(&mut self, dna_address: &Address, agent_id: &Address) {
let chain_id = into_chain_id(dna_address, agent_id);
self.log.d(&format!("unregistering '{}'", chain_id));
let maybe_sender = self.senders.remove(&chain_id);
if maybe_sender.is_none() {
return;
}
match self.senders_by_dna.entry(dna_address.to_owned()) {
Entry::Occupied(mut senders) => {
senders.get_mut().remove(agent_id);
}
Entry::Vacant(_) => unreachable!(),
};
self.log.d(&format!("unregistering '{}' DONE", chain_id));
}
pub fn serve(&mut self, data: Lib3hClientProtocol) -> NetResult<()> {
self.log
.d(&format!(">>>> '{}' recv: {:?}", self.name.clone(), data));
match data {
Lib3hClientProtocol::SuccessResult(msg) => {
let dna_address = msg.space_address.clone();
let to_agent_id = msg.to_agent_id.clone();
let is_tracked =
self.priv_check_or_fail(&dna_address.clone().into(), &to_agent_id, None)?;
if !is_tracked {
return Ok(());
}
self.priv_send_one(
&dna_address.into(),
&to_agent_id,
Lib3hServerProtocol::SuccessResult(msg.clone()),
)?;
}
Lib3hClientProtocol::JoinSpace(msg) => {
let dna_address = msg.space_address.clone();
let agent_id = msg.agent_id.clone();
println!(
"JOINING SPACE: dna({}), agent({})",
dna_address.to_string(),
agent_id.to_string()
);
let chain_id = into_chain_id(&dna_address.clone().into(), &agent_id);
if self.trackdna_book.contains(&chain_id) {
self.log.e(&format!(
"({}) ##### DNA already tracked: {}",
self.name.clone(),
chain_id
));
return Ok(());
}
self.trackdna_book.insert(chain_id);
self.priv_request_all_lists(&dna_address.clone().into(), &agent_id);
}
Lib3hClientProtocol::LeaveSpace(msg) => {
let dna_address = msg.space_address.clone();
let agent_id = msg.agent_id.clone();
let chain_id = into_chain_id(&dna_address.into(), &agent_id);
if !self.trackdna_book.contains(&chain_id) {
self.log.w(&format!(
"Trying to untrack an already untracked DNA: {}",
chain_id
));
return Ok(());
}
self.trackdna_book.remove(&chain_id);
}
Lib3hClientProtocol::SendDirectMessage(msg) => {
self.priv_serve_SendMessage(&msg)?;
}
Lib3hClientProtocol::HandleSendDirectMessageResult(msg) => {
self.priv_serve_HandleSendMessageResult(&msg)?;
}
Lib3hClientProtocol::PublishEntry(msg) => {
self.priv_serve_PublishEntry(&msg)?;
}
Lib3hClientProtocol::HandleFetchEntryResult(msg) => {
self.priv_serve_HandleFetchEntryResult(&msg)?;
}
Lib3hClientProtocol::QueryEntry(msg) => {
self.priv_serve_QueryEntry(&msg)?;
}
Lib3hClientProtocol::HandleQueryEntryResult(msg) => {
self.priv_serve_HandleQueryEntryResult(&msg)?;
}
Lib3hClientProtocol::HandleGetAuthoringEntryListResult(msg) => {
self.priv_serve_HandleGetAuthoringEntryListResult(&msg)?;
}
Lib3hClientProtocol::HandleGetGossipingEntryListResult(msg) => {
self.priv_serve_HandleGetGossipingEntryListResult(&msg);
}
msg => {
self.log.w(&format!("unexpected {:?}", &msg));
}
}
Ok(())
}
}
impl InMemoryServer {
fn priv_check_or_fail(
&mut self,
dna_address: &Address,
agent_id: &AgentPubKey,
maybe_sender_info: Option<(AgentPubKey, Option<String>)>,
) -> NetResult<bool> {
let chain_id = into_chain_id(dna_address, agent_id);
if self.trackdna_book.contains(&chain_id) {
self.log.t(&format!(
"---- '{}' check OK: {}",
self.name.clone(),
chain_id,
));
return Ok(true);
};
if maybe_sender_info.is_none() {
self.log.e(&format!(
"#### '{}' check failed: {}",
self.name.clone(),
chain_id
));
return Err(NetworkError::GenericError {
error: "DNA not tracked by agent and no sender info.".to_string(),
}
.into());
}
let sender_info = maybe_sender_info.unwrap();
let sender_agent_id = sender_info.0;
let sender_request_id = sender_info.1.unwrap_or_default();
let fail_msg = GenericResultData {
space_address: dna_address.clone().into(),
request_id: sender_request_id,
to_agent_id: sender_agent_id.clone(),
result_info: "DNA not tracked by agent".into(),
};
self.log.e(&format!(
"#### '{}' check failed for {}.\n\t Sending failure {:?}",
self.name.clone(),
chain_id,
fail_msg.clone()
));
self.priv_send_one(
dna_address,
&sender_agent_id,
Lib3hServerProtocol::FailureResult(fail_msg),
)?;
Ok(false)
}
fn priv_send_one_with_chain_id(
&mut self,
chain_id: &str,
data: Lib3hServerProtocol,
) -> NetResult<()> {
let maybe_sender = self.senders.get_mut(chain_id);
if maybe_sender.is_none() {
self.log.e(&format!(
"#### ({}) error: No sender channel found for {}",
self.name.clone(),
chain_id,
));
return Err(format_err!(
"({}) No sender channel found for {}",
self.name.clone(),
chain_id,
));
}
let sender = maybe_sender.unwrap();
self.log
.d(&format!("<<<< '{}' send: {:?}", self.name.clone(), data));
sender.send(data)?;
Ok(())
}
fn priv_send_one(
&mut self,
dna_address: &Address,
to_agent_id: &Address,
data: Lib3hServerProtocol,
) -> NetResult<()> {
let chain_id = into_chain_id(dna_address, to_agent_id);
self.priv_send_one_with_chain_id(&chain_id, data)
}
fn priv_send_all(&mut self, dna_address: &Address, data: Lib3hServerProtocol) -> NetResult<()> {
if let Some(arr) = self.senders_by_dna.get_mut(dna_address) {
self.log.d(&format!(
"<<<< '{}' send all: {:?} ({})",
self.name.clone(),
data.clone(),
dna_address.clone()
));
for (_k, val) in arr.iter_mut() {
(*val).send(data.clone())?;
}
}
Ok(())
}
}
impl InMemoryServer {
fn priv_serve_SendMessage(&mut self, msg: &DirectMessageData) -> NetResult<()> {
let dna_address = msg.space_address.clone();
let from_agent_id: AgentPubKey = msg.from_agent_id.clone();
let to_agent_id: AgentPubKey = msg.to_agent_id.clone();
let sender_info = Some((from_agent_id.clone(), Some(msg.request_id.clone())));
let is_tracking = self.priv_check_or_fail(
&dna_address.clone().into(),
&from_agent_id,
sender_info.clone(),
)?;
if !is_tracking {
return Ok(());
}
let is_tracking =
self.priv_check_or_fail(&dna_address.clone().into(), &to_agent_id, sender_info)?;
if !is_tracking {
return Ok(());
}
self.priv_send_one(
&msg.space_address.clone().into(),
&msg.to_agent_id.clone().into(),
Lib3hServerProtocol::HandleSendDirectMessage(msg.clone()),
)?;
Ok(())
}
fn priv_serve_HandleSendMessageResult(&mut self, msg: &DirectMessageData) -> NetResult<()> {
let dna_address = msg.space_address.clone();
let from_agent_id: AgentPubKey = msg.from_agent_id.clone();
let to_agent_id: AgentPubKey = msg.to_agent_id.clone();
let sender_info = Some((from_agent_id.clone(), Some(msg.request_id.clone())));
let is_tracking = self.priv_check_or_fail(
&dna_address.clone().into(),
&from_agent_id,
sender_info.clone(),
)?;
if !is_tracking {
return Ok(());
}
let is_tracking =
self.priv_check_or_fail(&dna_address.clone().into(), &to_agent_id, sender_info)?;
if !is_tracking {
return Ok(());
}
self.priv_send_one(
&dna_address.clone().into(),
&to_agent_id,
Lib3hServerProtocol::SendDirectMessageResult(msg.clone()),
)?;
Ok(())
}
fn priv_serve_PublishEntry(&mut self, msg: &ProvidedEntryData) -> NetResult<()> {
let dna_address = msg.space_address.clone();
let provider_agent_id: AgentPubKey = msg.provider_agent_id.clone();
let entry_address = msg.entry.entry_address.clone();
let sender_info = Some((provider_agent_id.clone(), None));
let is_tracking = self.priv_check_or_fail(
&dna_address.clone().into(),
&provider_agent_id.clone(),
sender_info,
)?;
if !is_tracking {
return Ok(());
}
for aspect in msg.entry.aspect_list.clone() {
let chain_id = into_chain_id(&dna_address.clone().into(), &provider_agent_id);
let aspect_address = aspect.clone().aspect_address;
if !book_has_aspect(&self.stored_book, chain_id, &entry_address, &aspect_address) {
bookkeep(
&mut self.authored_book,
&dna_address.clone().into(),
&provider_agent_id,
&entry_address,
&aspect_address,
);
}
let store_msg = StoreEntryAspectData {
request_id: self.priv_generate_request_id(),
space_address: msg.clone().space_address,
provider_agent_id: msg.provider_agent_id.clone(),
entry_address: msg.entry.entry_address.clone(),
entry_aspect: aspect,
};
self.priv_send_all(
&dna_address.clone().into(),
Lib3hServerProtocol::HandleStoreEntryAspect(store_msg),
)?;
}
Ok(())
}
fn priv_serve_HandleFetchEntryResult(&mut self, msg: &FetchEntryResultData) -> NetResult<()> {
let dna_address = msg.space_address.clone();
let provider_agent_id: AgentPubKey = msg.provider_agent_id.clone();
let sender_info = Some((msg.provider_agent_id.clone(), Some(msg.request_id.clone())));
let is_tracking =
self.priv_check_or_fail(&dna_address.into(), &provider_agent_id, sender_info.clone())?;
if !is_tracking {
return Ok(());
}
if !self.request_book.contains_key(&msg.request_id) {
return Ok(());
}
let dht_data = ProvidedEntryData {
space_address: msg.space_address.clone(),
provider_agent_id: msg.provider_agent_id.clone(),
entry: msg.entry.clone(),
};
self.priv_serve_PublishEntry(&dht_data)?;
Ok(())
}
fn priv_serve_QueryEntry(&mut self, msg: &QueryEntryData) -> NetResult<()> {
let dna_address = msg.space_address.clone();
let sender_info = Some((msg.requester_agent_id.clone(), Some(msg.request_id.clone())));
let is_tracking = self.priv_check_or_fail(
&dna_address.clone().into(),
&msg.requester_agent_id.clone(),
sender_info,
)?;
if !is_tracking {
return Ok(());
}
match self.senders_by_dna.entry(dna_address.to_owned().into()) {
Entry::Occupied(mut e) => {
if !e.get().is_empty() {
for (k, r) in e.get_mut().iter() {
if k == &msg.requester_agent_id {
self.log.i(&format!("---- HandleQueryEntry {}", k));
r.send(Lib3hServerProtocol::HandleQueryEntry(msg.clone()).into())?;
return Ok(());
}
}
}
}
_ => unreachable!(),
};
let response = Lib3hServerProtocol::QueryEntryResult(QueryEntryResultData {
space_address: msg.space_address.clone(),
entry_address: msg.entry_address.clone(),
request_id: msg.request_id.clone(),
requester_agent_id: msg.requester_agent_id.clone(),
responder_agent_id: msg.requester_agent_id.clone(),
query_result: vec![].into(),
});
self.priv_send_one(
&dna_address.into(),
&msg.requester_agent_id.clone(),
response.into(),
)?;
Ok(())
}
fn priv_serve_HandleQueryEntryResult(&mut self, msg: &QueryEntryResultData) -> NetResult<()> {
let dna_address = msg.space_address.clone();
let responder_agent_id: AgentPubKey = msg.responder_agent_id.clone();
let requester_agent_id: AgentPubKey = msg.requester_agent_id.clone();
let sender_info = Some((responder_agent_id.clone(), Some(msg.request_id.clone())));
let is_tracking = self.priv_check_or_fail(
&dna_address.clone().into(),
&responder_agent_id.clone(),
sender_info.clone(),
)?;
if !is_tracking {
return Ok(());
}
let is_tracking = requester_agent_id.to_string() == ""
|| self.priv_check_or_fail(
&dna_address.clone().into(),
&requester_agent_id,
sender_info,
)?;
if !is_tracking {
return Ok(());
}
self.priv_send_one(
&dna_address.into(),
&requester_agent_id,
Lib3hServerProtocol::QueryEntryResult(msg.clone()).into(),
)?;
Ok(())
}
fn priv_serve_HandleGetAuthoringEntryListResult(
&mut self,
msg: &EntryListData,
) -> NetResult<()> {
let chain_id = self
.priv_check_request(&msg.request_id)
.expect("Not our request")
.to_string();
self.log.d(&format!(
"---- HandleGetAuthoringEntryListResult: chain_id = '{}'",
chain_id,
));
for (entry_address, aspect_address_list) in msg.address_map.clone() {
for aspect_address in aspect_address_list {
if book_has_aspect(
&self.authored_book,
chain_id.clone(),
&entry_address.clone(),
&aspect_address.clone(),
) {
continue;
}
let request_id = self.priv_create_request_with_chain_id(&chain_id);
self.priv_send_one_with_chain_id(
&chain_id,
Lib3hServerProtocol::HandleFetchEntry(FetchEntryData {
space_address: msg.space_address.clone(),
provider_agent_id: undo_chain_id(&chain_id).1,
request_id,
entry_address: entry_address.clone(),
aspect_address_list: Some(vec![aspect_address]),
})
.into(),
)?;
}
}
Ok(())
}
fn priv_serve_HandleGetGossipingEntryListResult(&mut self, msg: &EntryListData) {
let chain_id = {
if msg.request_id == "" {
let agent = msg.provider_agent_id.clone();
let space = msg.space_address.clone();
into_chain_id(&space.into(), &agent.into())
} else {
self.priv_check_request(&msg.request_id)
.expect("Not our request")
.to_string()
}
};
self.log.d(&format!(
"---- HandleGetHoldingEntryListResult: chain_id = '{}'",
chain_id,
));
for (entry_address, aspect_address_list) in msg.address_map.clone() {
for aspect_address in aspect_address_list {
if book_has_aspect(
&self.stored_book,
chain_id.clone(),
&entry_address,
&aspect_address,
) {
continue;
}
bookkeep_with_chain_id(
&mut self.stored_book,
chain_id.clone(),
&entry_address,
&aspect_address,
);
let request_id = self.priv_create_request_with_chain_id(&chain_id);
let _ = self.priv_send_one_with_chain_id(
&chain_id,
Lib3hServerProtocol::HandleFetchEntry(FetchEntryData {
space_address: msg.space_address.clone(),
provider_agent_id: undo_chain_id(&chain_id).1,
request_id,
entry_address: entry_address.clone(),
aspect_address_list: Some(vec![aspect_address]),
}),
);
}
}
}
}