feophantlib/
feophant.rs

1use crate::{
2    codec::{NetworkFrame, PgCodec},
3    engine::{
4        io::block_layer::file_manager2::{FileManager2, FileManager2Error},
5        transactions::TransactionManager,
6        Engine,
7    },
8    processor::ClientProcessor,
9};
10use futures::{SinkExt, StreamExt};
11use std::{ffi::OsString, sync::Arc};
12use thiserror::Error;
13use tokio::{
14    net::TcpListener,
15    sync::oneshot::{error::RecvError, Sender},
16};
17use tokio::{
18    pin,
19    sync::mpsc::{error::SendError, UnboundedReceiver},
20};
21use tokio_util::codec::Framed;
22
23pub struct FeOphant {
24    pub port: u16,
25    listener: TcpListener,
26    transaction_manager: TransactionManager,
27    engine: Engine,
28    file_manager: Arc<FileManager2>,
29}
30
31impl FeOphant {
32    pub async fn new(data_dir: OsString, port: u16) -> Result<FeOphant, FeOphantError> {
33        let file_manager = Arc::new(FileManager2::new(data_dir)?);
34        let transaction_manager = TransactionManager::new();
35        let engine = Engine::new(file_manager.clone(), transaction_manager.clone());
36
37        let listener = TcpListener::bind(format!("{}{}", "127.0.0.1:", port)).await?;
38        let port = listener.local_addr()?.port();
39        debug!("Bound to port {0}, but not processing yet.", port);
40
41        Ok(FeOphant {
42            port,
43            listener,
44            transaction_manager,
45            engine,
46            file_manager,
47        })
48    }
49
50    /// Starts up the actual server, should be started as its own task
51    /// Send on the shutdown_recv to shut it down.
52    pub async fn start(&self, shutdown_recv: UnboundedReceiver<Sender<()>>) {
53        let shutdown_sender: Option<Sender<()>>;
54        info!("Up and listening on port {}", self.port);
55
56        let listen = &self.listener;
57        pin!(shutdown_recv);
58        pin!(listen);
59
60        loop {
61            tokio::select! {
62                biased;
63                shut_sender = shutdown_recv.recv() => {
64                    if let Some(sender) = shut_sender {
65                        shutdown_sender = Some(sender);
66                        info!("Got shutdown request");
67                        break;
68                    }
69                }
70                listen_res = listen.accept() => {
71                    if let Ok((stream, client_addr)) = listen_res {
72                        info!("Got a connection from {}", client_addr);
73                        let tm = self.transaction_manager.clone();
74                        let eng = self.engine.clone();
75                        tokio::spawn(async move {
76                            let codec = PgCodec {};
77                            let (mut sink, mut input) = Framed::new(stream, codec).split();
78
79                            let mut process = ClientProcessor::new(eng, tm);
80                            while let Some(Ok(event)) = input.next().await {
81                                let responses: Vec<NetworkFrame> = match process.process(event).await {
82                                    Ok(responses) => responses,
83                                    Err(e) => {
84                                        warn!("Had a processing error {}", e);
85                                        break;
86                                    }
87                                };
88
89                                for response in responses {
90                                    match sink.send(response).await {
91                                        Ok(_) => {}
92                                        Err(e) => {
93                                            warn!("Unable to send response {}", e);
94                                            break;
95                                        }
96                                    }
97                                }
98                            }
99                        });
100                    } else if let Err(e) = listen_res {
101                        error!("Got error receiving a connection. {0}", e);
102                    }
103                }
104            };
105        }
106
107        //Clean up
108        match shutdown_sender {
109            Some(s) => {
110                debug!("Attempting to signal shutdown.");
111                s.send(())
112                    .unwrap_or_else(|_| warn!("Unable to signal shutdown."));
113            }
114            None => {
115                error!("Exitting before shutting down all the sockets!");
116            }
117        }
118    }
119}
120
121#[derive(Debug, Error)]
122pub enum FeOphantError {
123    #[error("FeOphant already started.")]
124    AlreadyStarted(),
125    #[error("Can't start the FeOphant twice")]
126    CantStartTwice(),
127    #[error(transparent)]
128    FileManager2Error(#[from] FileManager2Error),
129    #[error(transparent)]
130    IOError(#[from] std::io::Error),
131    #[error(transparent)]
132    RecvError(#[from] RecvError),
133    #[error(transparent)]
134    ShutdownSendError(#[from] SendError<Sender<()>>),
135}