use crate::{
actor_messages::{CommitMessage, Subscribe},
errors::AtomicServerResult,
handlers::web_sockets::WebSocketConnection,
search::SearchState,
};
use actix::{
prelude::{Actor, Context, Handler},
ActorStreamExt, Addr, ContextFutureSpawner,
};
use atomic_lib::{agents::ForAgent, Db, Storelike};
use chrono::Local;
use std::collections::{HashMap, HashSet};
pub struct CommitMonitor {
subscriptions: HashMap<String, HashSet<Addr<WebSocketConnection>>>,
store: Db,
search_state: SearchState,
last_search_commit: chrono::DateTime<Local>,
run_expensive_next_tick: bool,
}
const REBUILD_INDEX_TIME: std::time::Duration = std::time::Duration::from_secs(5);
impl Actor for CommitMonitor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
tracing::debug!("CommitMonitor started");
actix::utils::IntervalFunc::new(REBUILD_INDEX_TIME, Self::tick)
.finish()
.spawn(ctx);
}
}
impl Handler<Subscribe> for CommitMonitor {
type Result = ();
#[tracing::instrument(
name = "handle_subscribe",
skip_all,
fields(to = %msg.subject, agent = %msg.agent)
)]
fn handle(&mut self, msg: Subscribe, _ctx: &mut Context<Self>) {
if !msg.subject.starts_with(&self.store.get_self_url().unwrap()) {
tracing::warn!("can't subscribe to external resource");
return;
}
match self.store.get_resource(&msg.subject) {
Ok(resource) => {
match atomic_lib::hierarchy::check_read(
&self.store,
&resource,
&ForAgent::AgentSubject(msg.agent.clone()),
) {
Ok(_explanation) => {
let mut set = if let Some(set) = self.subscriptions.get(&msg.subject) {
set.clone()
} else {
HashSet::new()
};
set.insert(msg.addr);
tracing::debug!("handle subscribe {} ", msg.subject);
self.subscriptions.insert(msg.subject.clone(), set);
}
Err(unauthorized_err) => {
tracing::debug!(
"Not allowed {} to subscribe to {}: {}",
&msg.agent,
&msg.subject,
unauthorized_err
);
}
}
}
Err(e) => {
tracing::debug!(
"Subscribe failed for {} by {}: {}",
&msg.subject,
msg.agent,
e
);
}
}
}
}
impl CommitMonitor {
fn handle_internal(&mut self, msg: CommitMessage) -> AtomicServerResult<()> {
let target = msg.commit_response.commit.subject.clone();
if let Some(subscribers) = self.subscriptions.get(&target) {
tracing::debug!(
"Sending commit {} to {} subscribers",
target,
subscribers.len()
);
for connection in subscribers {
connection.do_send(msg.clone());
}
} else {
tracing::debug!("No subscribers for {}", target);
}
if let Some(resource) = &msg.commit_response.resource_new {
self.search_state.remove_resource(&target)?;
self.search_state.add_resource(resource, &self.store)?;
self.run_expensive_next_tick = true;
} else {
self.search_state.remove_resource(&target)?;
}
Ok(())
}
fn tick(&mut self, _ctx: &mut Context<Self>) {
if self.run_expensive_next_tick {
_ = self.update_expensive().map_err(|e| {
tracing::error!(
"Error during expensive update in Commit Monitor: {}",
e.to_string()
)
});
}
}
fn update_expensive(&mut self) -> AtomicServerResult<()> {
tracing::debug!("Update expensive");
self.search_state.writer.write()?.commit()?;
self.last_search_commit = chrono::Local::now();
self.run_expensive_next_tick = false;
Ok(())
}
}
impl Handler<CommitMessage> for CommitMonitor {
type Result = ();
#[tracing::instrument(name = "handle_commit_message", skip_all, fields(subscriptions = &self.subscriptions.len(), s = %msg.commit_response.commit_resource.get_subject()))]
fn handle(&mut self, msg: CommitMessage, _: &mut Context<Self>) {
match self.handle_internal(msg) {
Ok(_) => {}
Err(e) => {
tracing::error!(
"Handling commit in CommitMonitor failed, cache may not be fully updated: {}",
e
);
}
}
}
}
pub fn create_commit_monitor(store: Db, search_state: SearchState) -> Addr<CommitMonitor> {
tracing::info!("spawning commit monitor");
crate::commit_monitor::CommitMonitor::create(|_ctx: &mut Context<CommitMonitor>| {
CommitMonitor {
subscriptions: HashMap::new(),
store,
search_state,
run_expensive_next_tick: false,
last_search_commit: chrono::Local::now(),
}
})
}