anor_api/service/
api_service.rs

1use std::io::prelude::*;
2use std::net::{SocketAddr, TcpListener, TcpStream};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::mpsc::Sender;
5use std::sync::{Arc, Mutex};
6
7use log;
8
9use anor_storage::storage::{storage_item::StorageItem, Storage};
10use anor_utils::{config::Config, threadpool::ThreadPool};
11
12pub trait ApiService {
13    fn with_config(storage: Storage, config: Arc<Config>) -> Self;
14    fn start(&self, server_shutdown: Arc<AtomicBool>, signal_server_ready: Sender<()>) -> Result<(), String>;
15    fn stop(&self);
16    fn keys(&self) -> Vec<String>;
17    fn set_item(&self, key: &str, item: StorageItem) -> bool;
18    fn get_item(&self, key: &str) -> Option<StorageItem>;
19    fn remove_item(&self, key: &str) -> bool;
20}
21
22pub struct StorageApi {
23    storage: Storage,
24    config: Arc<Config>,
25}
26
27pub type AnorApiMutex<'a> = Arc<Mutex<StorageApi>>;
28
29impl ApiService for StorageApi {
30    fn with_config(storage: Storage, config: Arc<Config>) -> Self {
31        StorageApi { storage, config }
32    }
33
34    fn start(&self, shutdown: Arc<AtomicBool>, signal_ready_sender: Sender<()>) -> Result<(), String> {
35        assert!(self.config.api.is_some());
36        let config_server = self.config.api.as_ref().unwrap();
37        assert!(!config_server.listen_on.is_empty());
38        let listen_on = config_server.listen_on[0];
39
40        let listener = TcpListener::bind(listen_on).unwrap();
41
42        // send the ready signal
43        if let Err(err) = signal_ready_sender.send(()) {
44            return Err(err.to_string());
45        }
46
47        log::info!(
48            "Anor Storage API service listening on {} ...",
49            listen_on
50        );
51        // listener.set_nonblocking(true).unwrap();
52
53        let pool = ThreadPool::new(2);
54
55        while !shutdown.load(Ordering::SeqCst) {
56            match listener.accept() {
57                Ok((stream, addr)) => {
58                    let shutdown_clone = shutdown.clone();
59                    pool.execute(move || {
60                        handle_connection(stream, addr, shutdown_clone);
61                    });
62                }
63                /*
64                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
65                    // wait until network socket is ready, typically implemented
66                    // via platform-specific APIs such as epoll or IOCP
67                    thread::sleep(time::Duration::from_millis(1));
68                    continue;
69                }
70                */
71                Err(e) => log::error!("couldn't get client: {e:?}"),
72            }
73        }
74
75        Ok(())
76    }
77
78    fn stop(&self) {}
79
80    fn keys(&self) -> Vec<String> {
81        self.storage.keys()
82    }
83
84    fn set_item(&self, _key: &str, _item: StorageItem) -> bool {
85        false
86    }
87
88    fn get_item(&self, _key: &str) -> Option<StorageItem> {
89        None
90    }
91
92    fn remove_item(&self, key: &str) -> bool {
93        self.storage.remove(key);
94        true
95    }
96}
97
98fn handle_connection(mut stream: TcpStream, addr: SocketAddr, shutdown: Arc<AtomicBool>) {
99    log::debug!("Client connected: {}", addr);
100    let mut buf = [0; 1024];
101    let addr = stream.peer_addr().unwrap();
102    while !shutdown.load(Ordering::SeqCst) {
103        let count = stream.read(&mut buf).unwrap();
104        if log::log_enabled!(log::Level::Trace) {
105            log::trace!("Received bytes count from {} : {}", addr, count);
106        }
107
108        let mut vec = buf.to_vec();
109        vec.truncate(count);
110        let msg = String::from_utf8(vec).unwrap();
111
112        if log::log_enabled!(log::Level::Trace) {
113            log::trace!("Received message from {} : {}", addr, msg);
114        }
115    }
116}