[][src]Crate async_listen

Async Listen

The crate contains various helpers for writing production-ready servers in rust using async-std.

Docs | Github | Crate

Utilities

  • ListenExt -- extension trait for stream of accepted sockets, provides useful conbinators for a stream

Low-Level Utilities

Example

Here is a quite elaborate example that demonstrates:

  • Backpressure (limit on the number of simultaneous connections)
  • Error handling
  • Unification of Tcp and Unix sockets
use std::env::args;
use std::error::Error;
use std::fs::remove_file;
use std::io;
use std::time::Duration;

use async_std::task;
use async_std::net::TcpListener;
use async_std::prelude::*;

use async_listen::{ListenExt, ByteStream, backpressure};


fn main() -> Result<(), Box<dyn Error>> {
    let (_, bp) = backpressure::new(10);
    #[cfg(unix)] {
        use async_std::os::unix::net::UnixListener;

        if args().any(|x| x == "--unix") {
            remove_file("./example.sock").ok();
            return task::block_on(async {
                let listener = UnixListener::bind("./example.sock").await?;
                eprintln!("Accepting connections on ./example.sock");
                let mut incoming = listener.incoming()
                    .log_warnings(|e| eprintln!("Error: {}. Sleeping 0.5s", e))
                    .handle_errors(Duration::from_millis(500))
                    .backpressure_wrapper(bp);
                while let Some(stream) = incoming.next().await {
                    task::spawn(connection_loop(stream));
                }
                Ok(())
            });
        }
    }
    task::block_on(async {
        let listener = TcpListener::bind("localhost:8080").await?;
        eprintln!("Accepting connections on localhost:8080");
        let mut incoming = listener.incoming()
            .log_warnings(|e| eprintln!("Error: {}. Sleeping 0.5s", e))
            .handle_errors(Duration::from_millis(500))
            .backpressure_wrapper(bp);
        while let Some(stream) = incoming.next().await {
            task::spawn(async {
                if let Err(e) = connection_loop(stream).await {
                    eprintln!("Error: {}", e);
                }
            });
        }
        Ok(())
    })
}

async fn connection_loop(mut stream: ByteStream) -> Result<(), io::Error> {
    println!("Connected from {}", stream.peer_addr()?);
    task::sleep(Duration::from_secs(5)).await;
    stream.write_all("hello\n".as_bytes()).await?;
    Ok(())
}

Modules

backpressure

Backpressure handling structures

wrapper_types

This module exports all the public wrapper types that library uses

Structs

ByteStream

A wrapper around TcpStream and UnixStream

Enums

PeerAddr

A peer address for either Tcp or Unix socket

Traits

ListenExt

An extension trait that provides necessary combinators for turning a stream of accept() events into a full-featured connection listener

Functions

is_transient_error

Returns true if the error is transient