springql_foreign_service/
source.rs1pub mod source_input;
4
5use anyhow::Result;
6use chrono::Duration;
7use std::io::Write;
8use std::net::{IpAddr, Shutdown, SocketAddr, TcpListener, TcpStream};
9use std::thread;
10
11use self::source_input::ForeignSourceInput;
12
13pub struct ForeignSource {
15 my_addr: SocketAddr,
16}
17
18impl ForeignSource {
19 pub fn start(input: ForeignSourceInput) -> Result<Self> {
20 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
21 let my_addr = listener.local_addr().unwrap();
22
23 let _ = thread::Builder::new()
24 .name("ForeignSource".into())
25 .spawn(move || {
26 let (stream, _sock) = listener.accept().unwrap();
27 stream.shutdown(Shutdown::Read).unwrap();
28 Self::stream_handler(stream, input).unwrap();
29 });
30
31 Ok(Self { my_addr })
32 }
33
34 pub fn host_ip(&self) -> IpAddr {
35 self.my_addr.ip()
36 }
37
38 pub fn port(&self) -> u16 {
39 self.my_addr.port()
40 }
41
42 fn stream_handler(mut stream: TcpStream, input: ForeignSourceInput) -> Result<()> {
43 log::info!(
44 "[ForeignSource] Connection from {}",
45 stream.peer_addr().unwrap()
46 );
47
48 for res_v in input {
49 let v = res_v.unwrap();
50 let mut json_s = v.to_string();
51 json_s.push('\n');
52 stream.write_all(json_s.as_bytes()).unwrap();
53
54 log::info!("[ForeignSource] Sent: {}", json_s);
55 }
56
57 log::info!("[ForeignSource] No message left. Wait forever...");
58 thread::sleep(Duration::hours(1).to_std().unwrap());
59
60 Ok(())
61 }
62}