#![allow(clippy::manual_async_fn)]
use std::future::Future;
use std::pin::Pin;
use ringline::{AsyncEventHandler, Config, ConnCtx, ParseResult, RinglineBuilder, connect};
struct ConnectHandler {
worker_id: usize,
target: std::net::SocketAddr,
}
impl AsyncEventHandler for ConnectHandler {
fn on_accept(&self, _conn: ConnCtx) -> impl Future<Output = ()> + 'static {
async {}
}
fn on_start(&self) -> Option<Pin<Box<dyn Future<Output = ()> + 'static>>> {
let target = self.target;
let worker_id = self.worker_id;
Some(Box::pin(async move {
eprintln!("[worker {worker_id}] connecting to {target}");
let connect_future = match connect(target) {
Ok(f) => f,
Err(e) => {
eprintln!("[worker {worker_id}] connect failed: {e}");
ringline::request_shutdown().ok();
return;
}
};
match connect_future.await {
Ok(conn) => {
eprintln!("[worker {worker_id}] connected to {target}");
let msg: &[u8] = b"Hello from ringline!\n";
if let Err(e) = conn.send_nowait(msg) {
eprintln!("[worker {worker_id}] send error: {e}");
return;
}
let n = conn
.with_data(|data| {
let text = String::from_utf8_lossy(data);
eprintln!("[worker {worker_id}] received: {}", text.trim());
ParseResult::Consumed(data.len())
})
.await;
if n == 0 {
eprintln!("[worker {worker_id}] connection closed by peer");
}
ringline::request_shutdown().ok();
}
Err(e) => {
eprintln!("[worker {worker_id}] connect failed: {e}");
ringline::request_shutdown().ok();
}
}
}))
}
fn create_for_worker(worker_id: usize) -> Self {
let target: std::net::SocketAddr = std::env::var("TARGET")
.unwrap_or_else(|_| "127.0.0.1:7878".to_string())
.parse()
.expect("invalid TARGET address");
eprintln!("[worker {worker_id}] will connect to {target}");
ConnectHandler { worker_id, target }
}
}
fn main() {
let mut config = Config::default();
config.worker.threads = 1;
config.worker.pin_to_core = false;
config.sq_entries = 64;
config.recv_buffer.ring_size = 64;
config.recv_buffer.buffer_size = 4096;
config.max_connections = 64;
eprintln!("starting connect_echo example (client-only mode)");
let (_shutdown, handles) = RinglineBuilder::new(config)
.launch::<ConnectHandler>()
.expect("failed to launch workers");
for handle in handles {
if let Err(e) = handle.join().expect("worker thread panicked") {
eprintln!("worker exited with error: {e}");
}
}
}