use crate::commands::{queue, Call, CommandResult, Queue};
use crate::config::Config;
use crate::idb::table::{IndexType, Table};
use crate::idb::{Database, DatabaseCommand};
use crate::ig::docs::Doc;
use crate::platform::Platform;
use crate::repository::background::BackgroundCommand;
use crate::repository::{FileEvent, Repository, RepositoryFile};
use crate::spawn;
use anyhow::Context;
use chrono::{DateTime, Local};
use itertools::Itertools;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::SystemTime;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use yaml_rust::{Yaml, YamlLoader};
#[async_trait::async_trait]
pub trait Loader: Display + Send + Sync {
async fn file_changed(&self, loader_info: &LoaderInfo) -> anyhow::Result<()>;
async fn update_table(&self, doc: Doc, loader_info: &LoaderInfo) -> anyhow::Result<()> {
let table = Table::new(doc, self.indices(loader_info)).context("Failed to build table.")?;
self.platform()
.require::<Database>()
.perform(DatabaseCommand::CreateTable(
self.table_name(loader_info)?.to_owned(),
table,
))
.await
.context("Failed to create table.")?;
Ok(())
}
fn platform(&self) -> &Arc<Platform>;
fn table_name<'a>(&self, loader_info: &'a LoaderInfo) -> anyhow::Result<&'a str> {
loader_info.config["table"]
.as_str()
.context("Missing config 'table'!")
}
fn indices(&self, loader_info: &LoaderInfo) -> Vec<IndexType> {
let mut result = Vec::new();
if let Some(array) = loader_info.config["fulltextIndices"].as_vec() {
for index in array
.iter()
.map(|value| value.as_str().unwrap_or(""))
.filter(|value| !value.is_empty())
{
result.push(IndexType::fulltext(index))
}
}
if let Some(array) = loader_info.config["indices"].as_vec() {
for index in array
.iter()
.map(|value| value.as_str().unwrap_or(""))
.filter(|value| !value.is_empty())
{
if !result.iter().any(|other| other.field_name() == index) {
result.push(IndexType::lookup(index))
}
}
}
result
}
async fn file_deleted(&self, loader_info: &LoaderInfo) -> anyhow::Result<()> {
if let Ok(name) = self.table_name(loader_info) {
self.platform()
.require::<Database>()
.perform(DatabaseCommand::DropTable(name.to_string()))
.await
.context("Failed to drop table.")?;
}
Ok(())
}
}
#[derive(Clone)]
pub struct LoaderInfo {
loader: Arc<dyn Loader>,
config: Yaml,
loader_file: PathBuf,
data_file: PathBuf,
namespace: String,
enabled: bool,
last_load: Option<SystemTime>,
last_error: Arc<Mutex<String>>,
}
impl LoaderInfo {
async fn needs_reload(&self) -> anyhow::Result<bool> {
if self.last_load.is_none() {
Ok(true)
} else {
let loader_last_modified = tokio::fs::metadata(&self.loader_file)
.await
.ok()
.context(format!(
"Failed to fetch metadata of {}",
self.loader_file.to_string_lossy()
))?
.modified()
.context(format!(
"Failed to determine last modified date of {}",
self.loader_file.to_string_lossy()
))?;
let data_last_modified = tokio::fs::metadata(&self.data_file)
.await
.ok()
.context(format!(
"Failed to fetch metadata of {}",
self.data_file.to_string_lossy()
))?
.modified()
.context(format!(
"Failed to determine last modified date of {}",
self.data_file.to_string_lossy()
))?;
Ok(loader_last_modified > self.last_load.unwrap()
|| data_last_modified > self.last_load.unwrap())
}
}
pub fn loader_file_name(&self) -> Cow<str> {
if let Some(name) = self.loader_file.file_name() {
name.to_string_lossy()
} else {
Cow::Borrowed("")
}
}
pub fn file_name(&self) -> Cow<str> {
if let Some(name) = self.data_file.file_name() {
name.to_string_lossy()
} else {
Cow::Borrowed("")
}
}
pub fn last_error(&self) -> String {
self.last_error.lock().unwrap().clone()
}
pub fn store_error(&self, error: String) {
*self.last_error.lock().unwrap() = error;
}
pub fn get_loader(&self) -> &Arc<dyn Loader> {
&self.loader
}
pub fn get_config(&self) -> &Yaml {
&self.config
}
pub fn get_data(&self) -> &Path {
self.data_file.as_path()
}
pub async fn get_data_str(&self) -> anyhow::Result<String> {
tokio::fs::read_to_string(self.get_data())
.await
.context("Unable to read data file")
}
}
impl Display for LoaderInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> {}",
self.loader_file.to_string_lossy(),
self.data_file.to_string_lossy()
)
}
}
impl Debug for LoaderInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
#[derive(FromPrimitive)]
pub enum LoaderCommands {
List,
}
pub fn actor(
platform: Arc<Platform>,
repository: Arc<Repository>,
mut background_task_sender: mpsc::Sender<BackgroundCommand>,
) -> Queue {
let (command_queue, mut commands_endpoint) = queue();
let mut listener = repository.listener();
spawn!(async move {
use crate::commands::ResultExt;
let mut loaders = HashMap::new();
let config = platform.require::<Config>();
let mut config_changed_flag = config.notifier();
let mut namespaces = load_namespaces(&config);
while platform.is_running() {
tokio::select! {
event = listener.recv() => {
match event {
Ok(FileEvent::FileChanged(file)) => {
if file.name.starts_with("/loaders/") {
if let Err(e) = loader_changed(
&mut loaders,
&namespaces,
&file,
&mut background_task_sender,
&repository,
).await {
log::error!("Failed to update loader {} - {:?}", file.name, e);
}
} else if let Err(e) = file_changed(&mut loaders, &file, &mut background_task_sender).await {
log::error!("Failed to update data file {} - {:?}", file.name, e);
}
}
Ok(FileEvent::FileDeleted(file)) => {
if file.name.starts_with("/loaders/") {
if let Err(e) = loader_removed(&mut loaders, &file, &mut background_task_sender).await {
log::error!("Failed to remove loader {} - {:?}", file.name, e);
}
} else if let Err(e) = file_removed(&mut loaders, &file, &mut background_task_sender).await {
log::error!("Failed to remove loader for deleted file {} - {:?}", file.name, e);
}
}
Err(error) => {
match error {
RecvError::Closed => {}
RecvError::Lagged(count) => {
log::error!("{} files were not handled, because the event channel is full - {:?}", count, error);
}
}
}
}
}
cmd = commands_endpoint.recv() => if let Some(mut call) = cmd {
if let
Some(LoaderCommands::List) = LoaderCommands::from_usize(call.token) { list_command(&mut call, &loaders).complete(call) }
},
_ = config_changed_flag.recv() => {
namespaces = load_namespaces(&config);
if let Err(e) = enforce_namespaces(&mut loaders, &namespaces, &mut background_task_sender).await {
log::error!("Failed to enforce namespaces: {:?}", e);
}
}
}
}
});
command_queue
}
fn load_namespaces(config: &Arc<Config>) -> Vec<String> {
if let Yaml::Array(ref namespaces) = config.current().config()["repository"]["namespaces"] {
namespaces
.iter()
.map(|element| element.as_str().unwrap_or("").to_owned())
.filter(|ns| !ns.is_empty())
.collect()
} else {
vec![]
}
}
async fn loader_changed(
loaders: &mut HashMap<String, Vec<LoaderInfo>>,
namespaces: &[String],
file: &RepositoryFile,
background_task_sender: &mut mpsc::Sender<BackgroundCommand>,
repository: &Arc<Repository>,
) -> anyhow::Result<()> {
let config_data = tokio::fs::read_to_string(file.path.as_path()).await?;
let mut configs = YamlLoader::load_from_str(config_data.as_str())?;
let config = configs.pop().unwrap_or(Yaml::Null);
let data_file = config["file"]
.as_str()
.context("Missing 'file' in loader description!")?;
let loader = config["loader"]
.as_str()
.context("Missing 'loader' in loader description!")?;
let namespace = config["namespace"]
.as_str()
.context("Missing 'namespace' in loader description!")?
.to_owned();
for infos in loaders.values_mut() {
for info in infos.iter_mut() {
if file.path == info.loader_file {
info.data_file = Repository::resolve(data_file).await?;
info.config = config.clone();
info.namespace = namespace.clone();
if info.needs_reload().await? {
let was_enabled = info.enabled;
info.enabled = namespaces.contains(&namespace);
*info.last_error.lock().unwrap() = "".to_string();
if info.enabled {
info.last_load = Some(SystemTime::now());
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForChange(info.clone()))
.await?;
} else if was_enabled {
info.last_load = None;
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForDelete(info.clone()))
.await?;
}
}
return Ok(());
}
}
}
let enabled = namespaces.contains(&namespace);
let new_loader = LoaderInfo {
loader: repository.find_loader(loader)?,
config: config.clone(),
loader_file: file.path.clone(),
data_file: Repository::resolve(data_file).await?,
enabled,
namespace,
last_load: if enabled {
Some(SystemTime::now())
} else {
None
},
last_error: Arc::new(Mutex::new("".to_string())),
};
if let Some(infos) = loaders.get_mut(data_file) {
infos.push(new_loader.clone());
} else {
let _ = loaders.insert(data_file.to_owned(), vec![new_loader.clone()]);
}
if enabled {
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForChange(
new_loader.clone(),
))
.await?;
}
Ok(())
}
async fn file_changed(
loaders: &mut HashMap<String, Vec<LoaderInfo>>,
file: &RepositoryFile,
background_task_sender: &mut mpsc::Sender<BackgroundCommand>,
) -> anyhow::Result<()> {
if let Some(infos) = loaders.get_mut(&file.name) {
for info in infos {
if info.enabled && info.needs_reload().await? {
info.last_load = Some(SystemTime::now());
*info.last_error.lock().unwrap() = "".to_string();
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForChange(info.clone()))
.await?;
}
}
}
Ok(())
}
async fn loader_removed(
loaders: &mut HashMap<String, Vec<LoaderInfo>>,
file: &RepositoryFile,
background_task_sender: &mut mpsc::Sender<BackgroundCommand>,
) -> anyhow::Result<()> {
for infos in loaders.values_mut() {
if let Some((index, _)) = infos
.iter()
.find_position(|info| info.loader_file == file.path)
{
let info = infos.remove(index);
if info.enabled {
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForDelete(info.clone()))
.await?;
}
}
}
Ok(())
}
async fn file_removed(
loaders: &mut HashMap<String, Vec<LoaderInfo>>,
file: &RepositoryFile,
background_task_sender: &mut mpsc::Sender<BackgroundCommand>,
) -> anyhow::Result<()> {
if let Some(infos) = loaders.get_mut(&file.name) {
for info in infos {
if info.enabled {
info.last_load = None;
*info.last_error.lock().unwrap() = "".to_string();
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForDelete(info.clone()))
.await?;
}
}
}
Ok(())
}
async fn enforce_namespaces(
loaders: &mut HashMap<String, Vec<LoaderInfo>>,
namespaces: &[String],
background_task_sender: &mut mpsc::Sender<BackgroundCommand>,
) -> anyhow::Result<()> {
for loaders in loaders.values_mut() {
for loader in loaders {
let is_enabled = namespaces.contains(&loader.namespace);
if is_enabled != loader.enabled {
loader.enabled = is_enabled;
if is_enabled {
log::info!(
"{} has been enabled via namespace {}. Loading...",
loader.file_name(),
&loader.namespace
);
loader.last_load = Some(SystemTime::now());
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForChange(loader.clone()))
.await?;
} else {
log::info!(
"{} has been disabled via namespace {}. Unloading...",
loader.file_name(),
&loader.namespace
);
loader.last_load = None;
background_task_sender
.send(BackgroundCommand::ExecuteLoaderForDelete(loader.clone()))
.await?;
}
}
}
}
Ok(())
}
fn list_command(call: &mut Call, loaders: &HashMap<String, Vec<LoaderInfo>>) -> CommandResult {
let mut all_loaders = Vec::new();
for infos in loaders.values() {
for info in infos {
all_loaders.push(info);
}
}
if call.request.parameter_count() > 0 {
call.response.array(all_loaders.len() as i32)?;
for loader in all_loaders {
call.response.array(6)?;
call.response.simple(loader.loader_file_name())?;
call.response.simple(loader.file_name())?;
call.response.simple(&loader.namespace)?;
call.response.boolean(loader.enabled)?;
if let Some(last_load) = loader.last_load {
call.response
.simple(DateTime::<Local>::from(last_load).to_rfc3339())?;
} else {
call.response.simple("")?;
}
call.response.bulk(loader.last_error())?;
}
} else {
let mut result = String::new();
for loader in all_loaders {
result += format!("Loader File: {}\n", loader.loader_file_name()).as_str();
result += format!("Data File: {}\n", loader.file_name()).as_str();
result += format!("Namespace: {}\n", loader.namespace).as_str();
result += format!("Enabled: {}\n", loader.enabled).as_str();
result += format!(
"Last Load: {}\n",
if let Some(last_load) = loader.last_load {
DateTime::<Local>::from(last_load)
.format("%Y-%m-%dT%H:%M:%S")
.to_string()
} else {
"".to_owned()
}
)
.as_str();
result += format!("Last Error: {}\n", loader.last_error()).as_str();
result += crate::response::SEPARATOR;
}
call.response.bulk(result)?;
}
Ok(())
}