anor_api/service/
api_service.rs1use std::io::prelude::*;
2use std::net::{SocketAddr, TcpListener, TcpStream};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::mpsc::Sender;
5use std::sync::{Arc, Mutex};
6
7use log;
8
9use anor_storage::storage::{storage_item::StorageItem, Storage};
10use anor_utils::{config::Config, threadpool::ThreadPool};
11
12pub trait ApiService {
13 fn with_config(storage: Storage, config: Arc<Config>) -> Self;
14 fn start(&self, server_shutdown: Arc<AtomicBool>, signal_server_ready: Sender<()>) -> Result<(), String>;
15 fn stop(&self);
16 fn keys(&self) -> Vec<String>;
17 fn set_item(&self, key: &str, item: StorageItem) -> bool;
18 fn get_item(&self, key: &str) -> Option<StorageItem>;
19 fn remove_item(&self, key: &str) -> bool;
20}
21
22pub struct StorageApi {
23 storage: Storage,
24 config: Arc<Config>,
25}
26
27pub type AnorApiMutex<'a> = Arc<Mutex<StorageApi>>;
28
29impl ApiService for StorageApi {
30 fn with_config(storage: Storage, config: Arc<Config>) -> Self {
31 StorageApi { storage, config }
32 }
33
34 fn start(&self, shutdown: Arc<AtomicBool>, signal_ready_sender: Sender<()>) -> Result<(), String> {
35 assert!(self.config.api.is_some());
36 let config_server = self.config.api.as_ref().unwrap();
37 assert!(!config_server.listen_on.is_empty());
38 let listen_on = config_server.listen_on[0];
39
40 let listener = TcpListener::bind(listen_on).unwrap();
41
42 if let Err(err) = signal_ready_sender.send(()) {
44 return Err(err.to_string());
45 }
46
47 log::info!(
48 "Anor Storage API service listening on {} ...",
49 listen_on
50 );
51 let pool = ThreadPool::new(2);
54
55 while !shutdown.load(Ordering::SeqCst) {
56 match listener.accept() {
57 Ok((stream, addr)) => {
58 let shutdown_clone = shutdown.clone();
59 pool.execute(move || {
60 handle_connection(stream, addr, shutdown_clone);
61 });
62 }
63 Err(e) => log::error!("couldn't get client: {e:?}"),
72 }
73 }
74
75 Ok(())
76 }
77
78 fn stop(&self) {}
79
80 fn keys(&self) -> Vec<String> {
81 self.storage.keys()
82 }
83
84 fn set_item(&self, _key: &str, _item: StorageItem) -> bool {
85 false
86 }
87
88 fn get_item(&self, _key: &str) -> Option<StorageItem> {
89 None
90 }
91
92 fn remove_item(&self, key: &str) -> bool {
93 self.storage.remove(key);
94 true
95 }
96}
97
98fn handle_connection(mut stream: TcpStream, addr: SocketAddr, shutdown: Arc<AtomicBool>) {
99 log::debug!("Client connected: {}", addr);
100 let mut buf = [0; 1024];
101 let addr = stream.peer_addr().unwrap();
102 while !shutdown.load(Ordering::SeqCst) {
103 let count = stream.read(&mut buf).unwrap();
104 if log::log_enabled!(log::Level::Trace) {
105 log::trace!("Received bytes count from {} : {}", addr, count);
106 }
107
108 let mut vec = buf.to_vec();
109 vec.truncate(count);
110 let msg = String::from_utf8(vec).unwrap();
111
112 if log::log_enabled!(log::Level::Trace) {
113 log::trace!("Received message from {} : {}", addr, msg);
114 }
115 }
116}