use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use dsf_core::prelude::*;
use dsf_core::service::Subscriber;
pub use dsf_rpc::service::{ServiceInfo, ServiceState};
use crate::store::Store;
pub mod inst;
pub use inst::ServiceInst;
pub mod data;
pub use data::Data;
#[derive(Clone)]
pub struct ServiceManager {
pub(crate) services: Arc<Mutex<HashMap<Id, Arc<RwLock<ServiceInst>>>>>,
store: Arc<Mutex<Store>>,
}
impl ServiceManager {
pub fn new(store: Arc<Mutex<Store>>) -> Self {
let services = HashMap::new();
let s = Self {
services: Arc::new(Mutex::new(services)),
store,
};
s.load();
s
}
pub fn register(
&mut self,
service: Service,
primary_page: &Page,
state: ServiceState,
updated: Option<SystemTime>,
) -> Result<Arc<RwLock<ServiceInst>>, DsfError> {
let services = self.services.clone();
let mut services = services.lock().unwrap();
let id = service.id();
let inst = ServiceInst {
service,
state,
index: services.len(),
last_updated: updated,
primary_page: Some(primary_page.clone()),
replica_page: None,
changed: true,
};
self.sync_inst(&inst);
let inst = Arc::new(RwLock::new(inst));
services.insert(id, inst.clone());
Ok(inst)
}
pub fn known(&self, id: &Id) -> bool {
let services = self.services.clone();
let services = services.lock().unwrap();
services.contains_key(id)
}
pub fn find(&self, id: &Id) -> Option<Arc<RwLock<ServiceInst>>> {
let services = self.services.clone();
let services = services.lock().unwrap();
let service = match services.get(id) {
Some(v) => v,
None => return None,
};
Some(service.clone())
}
pub fn index(&self, index: usize) -> Option<Arc<RwLock<ServiceInst>>> {
let services = self.services.clone();
let services = services.lock().unwrap();
services
.iter()
.find(|(_id, s)| s.read().unwrap().index == index)
.map(|(_id, s)| s.clone())
}
pub fn index_to_id(&self, index: usize) -> Option<Id> {
let services = self.services.clone();
let services = services.lock().unwrap();
services
.iter()
.find(|(_id, s)| s.read().unwrap().index == index)
.map(|(id, _s)| id.clone())
}
pub fn info(&self, id: &Id) -> Option<ServiceInfo> {
match self.find(id) {
Some(s) => Some(s.read().unwrap().info()),
None => None,
}
}
pub fn remove(&self, id: &Id) -> Result<Option<ServiceInfo>, DsfError> {
let service = { self.services.lock().unwrap().remove(id) };
if let Some(s) = service {
let info = s.read().unwrap().info();
let _ = self.store.lock().unwrap().delete_service(&info);
return Ok(Some(info));
};
return Ok(None);
}
#[cfg(nope)]
pub fn data(&self, id: &Id, n: usize) -> Result<Vec<DataInfo>, DsfError> {
let service = match self.find(id) {
Some(s) => s,
None => return Err(DsfError::UnknownService),
};
let s = service.read().unwrap();
let d = s.get_data(n).map(|d| DataInfo {
service: id.clone(),
index: d.version(),
body: d.body().clone(),
previous: None,
signature: d.signature().unwrap(),
});
Ok(d.collect())
}
pub fn update_inst<F>(&mut self, id: &Id, f: F) -> Option<ServiceInfo>
where
F: Fn(&mut ServiceInst),
{
match self.find(id) {
Some(s) => {
let mut svc = s.write().unwrap();
(f)(&mut svc);
svc.changed = true;
Some(svc.info())
}
None => None,
}
}
pub fn updates_required(
&mut self,
state: ServiceState,
interval: Duration,
force: bool,
) -> Vec<ServiceInfo> {
let services = self.services.lock().unwrap();
let updates: Vec<_> = services
.iter()
.filter_map(|(_id, svc)| {
let i = svc.read().unwrap();
match i.update_required(state, interval, force) {
true => Some(i.info()),
false => None,
}
})
.collect();
trace!("updates required (state: {:?}): {:?}", state, updates);
updates
}
pub(crate) fn sync_inst(&mut self, inst: &ServiceInst) {
let store = self.store.lock().unwrap();
if let Err(e) = store.save_service(&inst.info()) {
error!("Error writing service instance {}: {:?}", inst.id(), e);
}
if let Some(p) = &inst.primary_page {
store.save_page(p).unwrap();
}
if let Some(p) = &inst.replica_page {
store.save_page(p).unwrap();
}
}
pub fn list(&self) -> Vec<ServiceInfo> {
let services = self.services.lock().unwrap();
services
.iter()
.map(|(_k, v)| v.read().unwrap().info())
.collect()
}
pub fn count(&self) -> usize {
let services = self.services.lock().unwrap();
services.len()
}
pub fn sync(&self) {
let services = self.services.lock().unwrap();
let store = self.store.lock().unwrap();
for (id, inst) in services.iter() {
let mut inst = inst.write().unwrap();
let i: &mut ServiceInst = &mut inst;
if !i.changed {
continue;
}
if let Err(e) = store.save_service(&i.info()) {
error!("Error writing service instance {}: {:?}", id, e);
}
if let Some(p) = &i.primary_page {
store.save_page(p).unwrap();
}
if let Some(p) = &i.replica_page {
store.save_page(p).unwrap();
}
i.changed = false;
}
}
pub fn load(&self) {
let store = self.store.lock().unwrap();
let mut services = self.services.lock().unwrap();
let service_info = store.load_services().unwrap();
debug!("Loading {} services from database", service_info.len());
for i in service_info {
let public_key = i.public_key.clone();
let primary_page = match i.primary_page {
Some(p) => store
.load_page(&p, Some(public_key.clone()))
.unwrap()
.unwrap(),
None => {
warn!("No primary page for service: {:?}", i);
continue;
}
};
let replica_page = i
.replica_page
.map(|s| store.load_page(&s, Some(public_key.clone())).unwrap())
.flatten();
let mut service = Service::load(&primary_page).unwrap();
service.set_private_key(i.private_key);
service.set_secret_key(i.secret_key);
let s = ServiceInst {
service,
state: i.state,
index: i.index,
last_updated: i.last_updated,
primary_page: Some(primary_page),
replica_page,
changed: false,
};
services.entry(i.id).or_insert(Arc::new(RwLock::new(s)));
}
}
}
impl Drop for ServiceManager {
fn drop(&mut self) {
}
}