1use crate::{
2 codec::{NetworkFrame, PgCodec},
3 engine::{
4 io::block_layer::file_manager2::{FileManager2, FileManager2Error},
5 transactions::TransactionManager,
6 Engine,
7 },
8 processor::ClientProcessor,
9};
10use futures::{SinkExt, StreamExt};
11use std::{ffi::OsString, sync::Arc};
12use thiserror::Error;
13use tokio::{
14 net::TcpListener,
15 sync::oneshot::{error::RecvError, Sender},
16};
17use tokio::{
18 pin,
19 sync::mpsc::{error::SendError, UnboundedReceiver},
20};
21use tokio_util::codec::Framed;
22
23pub struct FeOphant {
24 pub port: u16,
25 listener: TcpListener,
26 transaction_manager: TransactionManager,
27 engine: Engine,
28 file_manager: Arc<FileManager2>,
29}
30
31impl FeOphant {
32 pub async fn new(data_dir: OsString, port: u16) -> Result<FeOphant, FeOphantError> {
33 let file_manager = Arc::new(FileManager2::new(data_dir)?);
34 let transaction_manager = TransactionManager::new();
35 let engine = Engine::new(file_manager.clone(), transaction_manager.clone());
36
37 let listener = TcpListener::bind(format!("{}{}", "127.0.0.1:", port)).await?;
38 let port = listener.local_addr()?.port();
39 debug!("Bound to port {0}, but not processing yet.", port);
40
41 Ok(FeOphant {
42 port,
43 listener,
44 transaction_manager,
45 engine,
46 file_manager,
47 })
48 }
49
50 pub async fn start(&self, shutdown_recv: UnboundedReceiver<Sender<()>>) {
53 let shutdown_sender: Option<Sender<()>>;
54 info!("Up and listening on port {}", self.port);
55
56 let listen = &self.listener;
57 pin!(shutdown_recv);
58 pin!(listen);
59
60 loop {
61 tokio::select! {
62 biased;
63 shut_sender = shutdown_recv.recv() => {
64 if let Some(sender) = shut_sender {
65 shutdown_sender = Some(sender);
66 info!("Got shutdown request");
67 break;
68 }
69 }
70 listen_res = listen.accept() => {
71 if let Ok((stream, client_addr)) = listen_res {
72 info!("Got a connection from {}", client_addr);
73 let tm = self.transaction_manager.clone();
74 let eng = self.engine.clone();
75 tokio::spawn(async move {
76 let codec = PgCodec {};
77 let (mut sink, mut input) = Framed::new(stream, codec).split();
78
79 let mut process = ClientProcessor::new(eng, tm);
80 while let Some(Ok(event)) = input.next().await {
81 let responses: Vec<NetworkFrame> = match process.process(event).await {
82 Ok(responses) => responses,
83 Err(e) => {
84 warn!("Had a processing error {}", e);
85 break;
86 }
87 };
88
89 for response in responses {
90 match sink.send(response).await {
91 Ok(_) => {}
92 Err(e) => {
93 warn!("Unable to send response {}", e);
94 break;
95 }
96 }
97 }
98 }
99 });
100 } else if let Err(e) = listen_res {
101 error!("Got error receiving a connection. {0}", e);
102 }
103 }
104 };
105 }
106
107 match shutdown_sender {
109 Some(s) => {
110 debug!("Attempting to signal shutdown.");
111 s.send(())
112 .unwrap_or_else(|_| warn!("Unable to signal shutdown."));
113 }
114 None => {
115 error!("Exitting before shutting down all the sockets!");
116 }
117 }
118 }
119}
120
121#[derive(Debug, Error)]
122pub enum FeOphantError {
123 #[error("FeOphant already started.")]
124 AlreadyStarted(),
125 #[error("Can't start the FeOphant twice")]
126 CantStartTwice(),
127 #[error(transparent)]
128 FileManager2Error(#[from] FileManager2Error),
129 #[error(transparent)]
130 IOError(#[from] std::io::Error),
131 #[error(transparent)]
132 RecvError(#[from] RecvError),
133 #[error(transparent)]
134 ShutdownSendError(#[from] SendError<Sender<()>>),
135}