springql_foreign_service/
source.rs

1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3pub mod source_input;
4
5use anyhow::Result;
6use chrono::Duration;
7use std::io::Write;
8use std::net::{IpAddr, Shutdown, SocketAddr, TcpListener, TcpStream};
9use std::thread;
10
11use self::source_input::ForeignSourceInput;
12
13/// Runs as a TCP server and write(2)s foreign rows to socket.
14pub struct ForeignSource {
15    my_addr: SocketAddr,
16}
17
18impl ForeignSource {
19    pub fn start(input: ForeignSourceInput) -> Result<Self> {
20        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
21        let my_addr = listener.local_addr().unwrap();
22
23        let _ = thread::Builder::new()
24            .name("ForeignSource".into())
25            .spawn(move || {
26                let (stream, _sock) = listener.accept().unwrap();
27                stream.shutdown(Shutdown::Read).unwrap();
28                Self::stream_handler(stream, input).unwrap();
29            });
30
31        Ok(Self { my_addr })
32    }
33
34    pub fn host_ip(&self) -> IpAddr {
35        self.my_addr.ip()
36    }
37
38    pub fn port(&self) -> u16 {
39        self.my_addr.port()
40    }
41
42    fn stream_handler(mut stream: TcpStream, input: ForeignSourceInput) -> Result<()> {
43        log::info!(
44            "[ForeignSource] Connection from {}",
45            stream.peer_addr().unwrap()
46        );
47
48        for res_v in input {
49            let v = res_v.unwrap();
50            let mut json_s = v.to_string();
51            json_s.push('\n');
52            stream.write_all(json_s.as_bytes()).unwrap();
53
54            log::info!("[ForeignSource] Sent: {}", json_s);
55        }
56
57        log::info!("[ForeignSource] No message left. Wait forever...");
58        thread::sleep(Duration::hours(1).to_std().unwrap());
59
60        Ok(())
61    }
62}