use futures_util::io::AsyncWriteExt;
use ringline::{AsyncEventHandler, Config, ConnCtx, ConnStream, RinglineBuilder};
struct StreamEcho {
worker_id: usize,
}
impl AsyncEventHandler for StreamEcho {
fn on_accept(&self, conn: ConnCtx) -> impl std::future::Future<Output = ()> + 'static {
let worker_id = self.worker_id;
async move {
eprintln!("[worker {worker_id}] accepted connection {}", conn.index());
let mut stream = ConnStream::new(conn);
loop {
let data = match futures_util::AsyncBufReadExt::fill_buf(&mut stream).await {
Ok([]) => break, Ok(buf) => buf.to_vec(),
Err(e) => {
eprintln!("[worker {worker_id}] read error: {e}");
break;
}
};
let n = data.len();
if let Err(e) = stream.write_all(&data).await {
eprintln!("[worker {worker_id}] write error: {e}");
break;
}
futures_util::AsyncBufReadExt::consume_unpin(&mut stream, n);
}
eprintln!("[worker {worker_id}] connection {} closed", conn.index());
}
}
fn create_for_worker(worker_id: usize) -> Self {
eprintln!("[worker {worker_id}] starting");
StreamEcho { worker_id }
}
}
fn main() {
let bind_addr = std::env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:7878".to_string());
let mut config = Config::default();
config.worker.threads = 1;
config.worker.pin_to_core = false;
config.sq_entries = 128;
config.recv_buffer.ring_size = 128;
config.recv_buffer.buffer_size = 4096;
config.max_connections = 1024;
eprintln!("starting stream echo server on {bind_addr}");
let (_shutdown, handles) = RinglineBuilder::new(config)
.bind(bind_addr.parse().expect("invalid bind address"))
.launch::<StreamEcho>()
.expect("failed to launch workers");
for handle in handles {
if let Err(e) = handle.join().expect("worker thread panicked") {
eprintln!("worker exited with error: {e:?}");
}
}
}