use std::io::prelude::*;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use log;
use anor_storage::storage::{storage_item::StorageItem, Storage};
use anor_utils::{config::Config, threadpool::ThreadPool};
pub trait ApiService {
fn with_config(storage: Storage, config: Arc<Config>) -> Self;
fn start(&self, server_shutdown: Arc<AtomicBool>, signal_server_ready: Sender<()>) -> Result<(), String>;
fn stop(&self);
fn keys(&self) -> Vec<String>;
fn set_item(&self, key: &str, item: StorageItem) -> bool;
fn get_item(&self, key: &str) -> Option<StorageItem>;
fn remove_item(&self, key: &str) -> bool;
}
pub struct StorageApi {
storage: Storage,
config: Arc<Config>,
}
pub type AnorApiMutex<'a> = Arc<Mutex<StorageApi>>;
impl ApiService for StorageApi {
fn with_config(storage: Storage, config: Arc<Config>) -> Self {
StorageApi { storage, config }
}
fn start(&self, shutdown: Arc<AtomicBool>, signal_ready_sender: Sender<()>) -> Result<(), String> {
assert!(self.config.api.is_some());
let config_server = self.config.api.as_ref().unwrap();
assert!(!config_server.listen_on.is_empty());
let listen_on = config_server.listen_on[0];
let listener = TcpListener::bind(listen_on).unwrap();
if let Err(err) = signal_ready_sender.send(()) {
return Err(err.to_string());
}
log::info!(
"Anor Storage API service listening on {} ...",
listen_on
);
let pool = ThreadPool::new(2);
while !shutdown.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, addr)) => {
let shutdown_clone = shutdown.clone();
pool.execute(move || {
handle_connection(stream, addr, shutdown_clone);
});
}
Err(e) => log::error!("couldn't get client: {e:?}"),
}
}
Ok(())
}
fn stop(&self) {}
fn keys(&self) -> Vec<String> {
self.storage.keys()
}
fn set_item(&self, _key: &str, _item: StorageItem) -> bool {
false
}
fn get_item(&self, _key: &str) -> Option<StorageItem> {
None
}
fn remove_item(&self, key: &str) -> bool {
self.storage.remove(key);
true
}
}
fn handle_connection(mut stream: TcpStream, addr: SocketAddr, shutdown: Arc<AtomicBool>) {
log::debug!("Client connected: {}", addr);
let mut buf = [0; 1024];
let addr = stream.peer_addr().unwrap();
while !shutdown.load(Ordering::SeqCst) {
let count = stream.read(&mut buf).unwrap();
if log::log_enabled!(log::Level::Trace) {
log::trace!("Received bytes count from {} : {}", addr, count);
}
let mut vec = buf.to_vec();
vec.truncate(count);
let msg = String::from_utf8(vec).unwrap();
if log::log_enabled!(log::Level::Trace) {
log::trace!("Received message from {} : {}", addr, msg);
}
}
}