use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use anyhow::Context;
use tokio::sync::broadcast;
use crate::commands::CommandDictionary;
use crate::platform::Platform;
use crate::repository::foreground::ForegroundCommands;
use crate::repository::loader::{Loader, LoaderCommands};
const FILE_EVENT_BROADCAST_BUFFER_SIZE: usize = 128;
mod background;
mod foreground;
pub mod loader;
#[derive(Clone, Debug)]
pub struct RepositoryFile {
name: String,
path: PathBuf,
size: u64,
last_modified: SystemTime,
}
impl Display for RepositoryFile {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.name)
}
}
impl PartialEq for RepositoryFile {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}
#[derive(Debug, Clone)]
pub enum FileEvent {
FileChanged(RepositoryFile),
FileDeleted(RepositoryFile),
}
pub enum BackgroundEvent {
FileListUpdated(Vec<RepositoryFile>),
FileEvent(FileEvent),
EpochCounter(i64),
}
impl Display for FileEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FileEvent::FileChanged(file) => write!(f, "Changed: {}", file),
FileEvent::FileDeleted(file) => write!(f, "Deleted: {}", file),
}
}
}
type FileEventReceiver = broadcast::Receiver<FileEvent>;
pub struct Repository {
broadcast_sender: broadcast::Sender<FileEvent>,
loaders: Mutex<HashMap<String, Arc<dyn Loader>>>,
}
impl Repository {
fn new() -> Self {
let (broadcast_sender, _) = broadcast::channel(FILE_EVENT_BROADCAST_BUFFER_SIZE);
Repository {
broadcast_sender,
loaders: Mutex::new(HashMap::new()),
}
}
pub fn register_loader(&self, name: String, loader: Arc<dyn Loader>) {
let _ = self.loaders.lock().unwrap().insert(name, loader);
}
pub fn find_loader(&self, name: &str) -> anyhow::Result<Arc<dyn Loader>> {
self.loaders
.lock()
.unwrap()
.get(name)
.cloned()
.context(format!("Unknown loader: {}", name))
}
#[cfg(not(test))]
async fn base_dir() -> PathBuf {
let path = Path::new("repository").to_path_buf();
if let Err(error) = tokio::fs::create_dir_all(path.clone()).await {
log::warn!(
"Failed to create repository base directory {}: {}",
path.to_string_lossy(),
error
)
}
path
}
#[cfg(test)]
async fn base_dir() -> PathBuf {
let mut path = Path::new("target").to_path_buf();
path.push("test-repository");
if let Err(error) = tokio::fs::create_dir_all(path.clone()).await {
log::warn!(
"Failed to create repository base directory {}: {}",
path.to_string_lossy(),
error
)
}
path
}
pub async fn resolve(file_name: &str) -> anyhow::Result<PathBuf> {
let mut result = Repository::base_dir().await;
for element in file_name.split('/').filter(|path| !path.is_empty()) {
result.push(element);
}
Ok(result)
}
fn listener(&self) -> FileEventReceiver {
self.broadcast_sender.subscribe()
}
async fn send_file_event(&self, file_event: FileEvent) {
let mut attempt = 0;
while self.broadcast_sender.len() >= FILE_EVENT_BROADCAST_BUFFER_SIZE && attempt < 10 {
attempt += 1;
log::debug!(
"Pausing sending file events for 1 second because the broadcast buffer is full (Attempt {} of 10)",
attempt
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
let _ = self.broadcast_sender.send(file_event);
}
}
pub fn create(platform: &Arc<Platform>) -> Arc<Repository> {
let repo = Arc::new(Repository::new());
platform.register::<Repository>(repo.clone());
repo
}
pub fn install(platform: Arc<Platform>, repository: Arc<Repository>) {
let (background_task_queue, update_notifier) = background::actor(platform.clone());
let loader_queue = loader::actor(
platform.clone(),
repository.clone(),
background_task_queue.clone(),
);
let command_queue = foreground::actor(
platform.clone(),
repository,
background_task_queue,
update_notifier,
);
if let Some(commands) = platform.find::<CommandDictionary>() {
commands.register_command(
"REPO.SCAN",
command_queue.clone(),
ForegroundCommands::Scan as usize,
);
commands.register_command(
"REPO.FETCH",
command_queue.clone(),
ForegroundCommands::Fetch as usize,
);
commands.register_command(
"REPO.STORE",
command_queue.clone(),
ForegroundCommands::Store as usize,
);
commands.register_command(
"REPO.FETCH_FORCED",
command_queue.clone(),
ForegroundCommands::ForceFetch as usize,
);
commands.register_command(
"REPO.RELOAD",
command_queue.clone(),
ForegroundCommands::ForceReload as usize,
);
commands.register_command(
"REPO.LIST",
command_queue.clone(),
ForegroundCommands::List as usize,
);
commands.register_command(
"REPO.DELETE",
command_queue.clone(),
ForegroundCommands::Delete as usize,
);
commands.register_command(
"REPO.EPOCHS",
command_queue.clone(),
ForegroundCommands::Epochs as usize,
);
commands.register_command(
"REPO.INC_EPOCH",
command_queue,
ForegroundCommands::IncEpoch as usize,
);
commands.register_command("REPO.LOADERS", loader_queue, LoaderCommands::List as usize);
}
}
#[cfg(test)]
mod tests {
use crate::builder::Builder;
use crate::config::Config;
use crate::platform::Platform;
use crate::repository::{FileEvent, FileEventReceiver, Repository};
use crate::server::Server;
use crate::spawn;
use crate::testing::{query_redis_async, test_async};
use chrono::{TimeZone, Utc};
use hyper::header::HeaderValue;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
async fn setup_env() -> (Arc<Platform>, Arc<Repository>) {
let base_dir = Repository::base_dir().await;
if let Ok(metadata) = tokio::fs::metadata(base_dir.clone()).await {
if metadata.is_dir() {
tokio::fs::remove_dir_all(base_dir).await.unwrap();
}
}
let platform = Builder::new().enable_all().disable_config().build().await;
let _ = crate::config::install(platform.clone(), false).await;
let repository = crate::repository::create(&platform);
crate::repository::install(platform.clone(), repository.clone());
platform
.require::<Config>()
.load_from_string(
r#"
server:
port: 1503
"#,
None,
)
.unwrap();
Server::fork_and_await(&platform.require::<Server>()).await;
(platform, repository)
}
async fn await_update(listener: &mut FileEventReceiver) -> Option<FileEvent> {
tokio::select! {
event = listener.recv() => match event {
Ok(event) => Some(event),
_ => None
},
_ = tokio::time::sleep(Duration::from_secs(2)) => None
}
}
#[test]
fn test_epochs() {
log::info!("Acquiring shared resources...");
let _guard = crate::testing::SHARED_TEST_RESOURCES.lock().unwrap();
log::info!("Successfully acquired shared resources.");
test_async(async {
let _ = setup_env().await;
let _ = query_redis_async(|con| redis::cmd("REPO.INC_EPOCH").query::<i64>(con)).await;
tokio::time::sleep(Duration::from_millis(150)).await;
let (foreground_epoch, background_epoch) =
query_redis_async(|con| redis::cmd("REPO.EPOCHS").query::<(i64, i64)>(con))
.await
.unwrap();
assert_eq!(foreground_epoch, 2);
assert_eq!(background_epoch, 2);
});
}
#[test]
fn ensure_repo_store_list_and_delete_works() {
log::info!("Acquiring shared resources...");
let _guard = crate::testing::SHARED_TEST_RESOURCES.lock().unwrap();
log::info!("Successfully acquired shared resources.");
test_async(async {
let (platform, repository) = setup_env().await;
let mut listener = repository.listener();
let repo_contents = query_redis_async(|con| {
redis::cmd("REPO.LIST")
.arg("raw")
.query::<Vec<Vec<(String, i64, String)>>>(con)
})
.await
.unwrap();
assert_eq!(repo_contents.len(), 0);
query_redis_async(|con| {
redis::cmd("REPO.STORE")
.arg("/test.yml")
.arg("test: true")
.query::<()>(con)
})
.await
.unwrap();
if let Some(FileEvent::FileChanged(file)) = await_update(&mut listener).await {
assert_eq!(file.name, "/test.yml")
} else {
panic!("Failed to receive file change event");
}
let repo_contents = query_redis_async(|con| {
redis::cmd("REPO.LIST")
.arg("raw")
.query::<Vec<Vec<(String, i64, String)>>>(con)
})
.await
.unwrap();
assert_eq!(repo_contents[0][0].0, "/test.yml");
let _ = query_redis_async(|con| {
redis::cmd("REPO.DELETE").arg("/test.yml").query::<()>(con)
})
.await;
if let Some(FileEvent::FileDeleted(file)) = await_update(&mut listener).await {
assert_eq!(file.name, "/test.yml")
} else {
panic!("Failed to receive file delete event");
}
let repo_contents = query_redis_async(|con| {
redis::cmd("REPO.LIST")
.arg("raw")
.query::<Vec<Vec<(String, i64, String)>>>(con)
})
.await
.unwrap();
assert_eq!(repo_contents.len(), 0);
platform.terminate();
});
}
#[test]
fn ensure_loaders_work() {
log::info!("Acquiring shared resources...");
let _guard = crate::testing::SHARED_TEST_RESOURCES.lock().unwrap();
log::info!("Successfully acquired shared resources.");
test_async(async {
let (platform, _) = setup_env().await;
crate::idb::install(platform.clone());
let _ = query_redis_async(|con| {
redis::cmd("REPO.STORE")
.arg("/test.yml")
.arg("test: true")
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents.len(), 0);
let _ = query_redis_async(|con| {
redis::cmd("REPO.STORE")
.arg("/loaders/test.yml")
.arg(
r#"
file: "/test.yml"
namespace: "test"
loader: "idb-yaml"
table: "loader-test"
indices: ["code"]
"#,
)
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents[0][0].1, "test.yml");
assert!(!repo_contents[0][0].3);
platform
.require::<Config>()
.load_from_string(
r#"
server:
port: 1503
repository:
namespaces: [ "test" ]
"#,
None,
)
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents[0][0].1, "test.yml");
assert_eq!(repo_contents[0][0].3, true);
let _ = query_redis_async(|con| {
redis::cmd("REPO.STORE")
.arg("/loaders/test1.yml")
.arg(
r#"
file: "/test.yml"
namespace: "test"
loader: "idb-yaml"
table: "loader-test"
indices: ["code"]
"#,
)
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents.len(), 2);
assert_eq!(repo_contents[1][0].0, "test1.yml");
assert!(repo_contents[1][0].3);
let _ = query_redis_async(|con| {
redis::cmd("REPO.DELETE")
.arg("/loaders/test1.yml")
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents.len(), 1);
assert_eq!(repo_contents[0][0].1, "test.yml");
assert_eq!(repo_contents[0][0].3, true);
let mut last_load = &repo_contents[0][0].4;
let _ = query_redis_async(|con| {
redis::cmd("REPO.STORE")
.arg("/test.yml")
.arg("test: true")
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents[0][0].1, "test.yml");
assert_eq!(repo_contents[0][0].3, true);
assert_ne!(&repo_contents[0][0].4, last_load);
last_load = &repo_contents[0][0].4;
let _ = query_redis_async(|con| {
redis::cmd("REPO.STORE")
.arg("/loaders/test.yml")
.arg(
r#"
file: "/test.yml"
namespace: "test"
loader: "idb-yaml"
table: "loader-test"
indices: ["code"]
"#,
)
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents[0][0].1, "test.yml");
assert!(repo_contents[0][0].3);
assert_ne!(&repo_contents[0][0].4, last_load);
platform
.require::<Config>()
.load_from_string(
r#"
server:
port: 1503
repository:
namespaces: [ ]
"#,
None,
)
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents[0][0].1, "test.yml");
assert!(!repo_contents[0][0].3);
let _ = query_redis_async(|con| {
redis::cmd("REPO.DELETE")
.arg("/loaders/test.yml")
.query::<()>(con)
})
.await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let repo_contents = fetch_loaders().await;
assert_eq!(repo_contents.len(), 0);
platform.terminate();
});
}
async fn fetch_loaders() -> Vec<Vec<(String, String, String, bool, String, String)>> {
query_redis_async(|con| {
redis::cmd("REPO.LOADERS")
.arg("raw")
.query::<Vec<Vec<(String, String, String, bool, String, String)>>>(con)
})
.await
.unwrap()
}
async fn mini_http_server(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let mut response = Response::new("test: true".into());
if req.uri().path().contains("new") {
let _ = response.headers_mut().insert(
hyper::header::LAST_MODIFIED,
HeaderValue::from_str(Utc::now().to_rfc2822().as_str()).unwrap(),
);
} else {
let _ = response.headers_mut().insert(
hyper::header::LAST_MODIFIED,
HeaderValue::from_str(
Utc.with_ymd_and_hms(2010, 10, 10, 10, 10, 10)
.unwrap()
.to_rfc2822()
.as_str(),
)
.unwrap(),
);
}
Ok(response)
}
#[test]
fn ensure_fetch_works() {
log::info!("Acquiring shared resources...");
let _guard = crate::testing::SHARED_TEST_RESOURCES.lock().unwrap();
log::info!("Successfully acquired shared resources.");
test_async(async {
spawn!(async {
let server_addr: SocketAddr = "127.0.0.1:7979"
.parse::<SocketAddr>()
.expect("Unable to parse socket address");
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(mini_http_server))
});
let server = hyper::server::Server::bind(&server_addr).serve(make_svc);
if let Err(e) = server.await {
panic!("server error: {}", e);
}
});
let (platform, repository) = setup_env().await;
let mut listener = repository.listener();
let _ = query_redis_async(|con| {
redis::cmd("REPO.FETCH")
.arg("/test.yml")
.arg("http://127.0.0.1:7979/file.yml")
.query::<()>(con)
})
.await;
if let Some(FileEvent::FileChanged(file)) = await_update(&mut listener).await {
assert_eq!(file.name, "/test.yml")
} else {
panic!("Failed to receive file change event");
}
let _ = query_redis_async(|con| {
redis::cmd("REPO.FETCH")
.arg("/test.yml")
.arg("http://127.0.0.1:7979/file.yml")
.query::<()>(con)
})
.await;
if await_update(&mut listener).await.is_some() {
panic!("Unexpected update event received...");
}
let _ = query_redis_async(|con| {
redis::cmd("REPO.FETCH")
.arg("/test.yml")
.arg("http://127.0.0.1:7979/new_file.yml")
.query::<()>(con)
})
.await;
if let Some(FileEvent::FileChanged(file)) = await_update(&mut listener).await {
assert_eq!(file.name, "/test.yml")
} else {
panic!("Failed to receive file change event");
}
let _ = query_redis_async(|con| {
redis::cmd("REPO.FETCH_FORCED")
.arg("/test.yml")
.arg("http://127.0.0.1:7979/file.yml")
.query::<()>(con)
})
.await;
if let Some(FileEvent::FileChanged(file)) = await_update(&mut listener).await {
assert_eq!(file.name, "/test.yml")
} else {
panic!("Failed to receive file change event");
}
platform.terminate();
});
}
}