cbsk_socket-0.1.0 has been yanked.
cbsk_socket is a socket callback tool
you can use cbsk_socket create tcp server or tcp client, you don't need to focus on TCP read and write, just focus on
business processing
now supported sockets
- tcp client √
- tcp server √
- ws client coming soon
- ws server coming soon
tcp server example
Cargo.toml file:
fast_log = "1.6.10"
cbsk_base = { version = "0.1.0" }
cbsk_run = { version = "0.1.1", default-features = false, features = ["async_pool"] }
cbsk_socket = { version = "0.1.0", features = ["tcp_server"] }
main.rs file:
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use cbsk_base::async_trait::async_trait;
use cbsk_base::{log, tokio};
use cbsk_base::tokio::task::JoinHandle;
use cbsk_socket::tcp::server::callback::TcpServerCallBack;
use cbsk_socket::tcp::server::client::TcpServerClient;
use cbsk_socket::tcp::server::config::TcpServerConfig;
use cbsk_socket::tcp::server::TcpServer;
use cbsk_socket::tcp::write_trait::WriteTrait;
pub static TCP_CLIENT: OnceLock<Arc<TcpServerClient>> = OnceLock::new();
#[tokio::main]
async fn main() {
let fast_config = fast_log::config::Config::default().console();
fast_log::init(fast_config).unwrap();
cbsk_run::async_pool::push(async {
TcpServerBusiness::start().await.unwrap()
});
cbsk_run::async_pool::push(async {
loop {
if let Some(tcp_client) = TCP_CLIENT.get() {
tcp_client.send_bytes(b"hello world").await;
tcp_client.send_json(&"hello world".to_string()).await;
tcp_client.send_text("hello world").await;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
cbsk_run::async_pool::listener().await.unwrap();
log::logger().flush();
}
pub struct TcpServerBusiness {}
impl TcpServerBusiness {
pub fn start() -> JoinHandle<()> {
let addr = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 8080);
let tcp_config = TcpServerConfig::new("test".into(), addr, false);
let tcp_server = TcpServer::new(tcp_config.into(), Self {}.into());
tcp_server.start::<1024>()
}
}
#[async_trait]
impl TcpServerCallBack for TcpServerBusiness {
async fn conn(&self, client: Arc<TcpServerClient>, handle: JoinHandle<()>) {
println!("a new connect come in: {}", client.log_head);
if TCP_CLIENT.set(client.clone()).is_err() {
println!("set global tcp client fail");
}
cbsk_run::async_pool::push(async move {
if let Err(e) = handle.await {
eprintln!("{} run error: {e:?}", client.log_head);
}
})
}
async fn dis_conn(&self, client: Arc<TcpServerClient>) {
println!("{} tcp client disconnect", client.log_head)
}
async fn recv(&self, mut bytes: Vec<u8>, client: Arc<TcpServerClient>) -> Vec<u8> {
println!("{} read bytes [{bytes:?}]", client.log_head);
if bytes.len() < 10 {
return bytes;
}
if bytes.len() > 13 {
let next_bytes = bytes.split_off(10);
println!("{} valid data: [{bytes:?}]", client.log_head);
return next_bytes;
}
Vec::new()
}
}
tcp client example
Cargo.toml file:
fast_log = "1.6.10"
cbsk_base = { version = "0.1.0" }
cbsk_run = { version = "0.1.1", default-features = false, features = ["async_pool"] }
cbsk_socket = { version = "0.1.0", features = ["tcp_client"] }
main.rs file:
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use cbsk_base::{log, tokio};
use cbsk_base::async_trait::async_trait;
use cbsk_base::tokio::task::JoinHandle;
use cbsk_socket::config::re_conn::SocketReConn;
use cbsk_socket::tcp::client::callback::TcpClientCallBack;
use cbsk_socket::tcp::client::config::TcpClientConfig;
use cbsk_socket::tcp::client::TcpClient;
use cbsk_socket::tcp::write_trait::WriteTrait;
pub static TCP_CLIENT: OnceLock<Arc<TcpClient<TcpClientBusiness>>> = OnceLock::new();
#[tokio::main]
async fn main() {
let fast_config = fast_log::config::Config::default().console();
fast_log::init(fast_config).unwrap();
cbsk_run::async_pool::push(async {
TcpClientBusiness::start().await.unwrap();
});
cbsk_run::async_pool::push(async {
loop {
if let Some(tcp_client) = TCP_CLIENT.get() {
tcp_client.send_bytes(b"hello world").await;
tcp_client.send_json(&"hello world".to_string()).await;
tcp_client.send_text("hello world").await;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
cbsk_run::async_pool::listener().await.unwrap();
log::logger().flush();
}
pub struct TcpClientBusiness {}
impl TcpClientBusiness {
pub fn start() -> JoinHandle<()> {
let addr = SocketAddr::new(IpAddr::from([127, 0, 0, 1]), 8080);
let tcp_config = TcpClientConfig::new("test".into(), addr, SocketReConn::enable(Duration::from_secs(3)));
let tcp_client = TcpClient::new(tcp_config.into(), Self {}.into());
let handle = tcp_client.start::<1024>();
if TCP_CLIENT.set(tcp_client.into()).is_err() {
println!("set global tcp client fail");
}
handle
}
}
#[async_trait]
impl TcpClientCallBack for TcpClientBusiness {
async fn conn(&self) {
println!("connect tcp server success");
}
async fn dis_conn(&self) {
println!("disconnect tcp server");
}
async fn re_conn(&self, num: i32) {
println!("re connect to tcp server, re num is {num}");
}
async fn recv(&self, mut bytes: Vec<u8>) -> Vec<u8> {
println!("read bytes [{bytes:?}]");
if bytes.len() < 10 {
return bytes;
}
if bytes.len() > 13 {
let next_bytes = bytes.split_off(10);
println!("valid data: [{bytes:?}]");
return next_bytes;
}
Vec::new()
}
}