1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//! General purpose TCP server.

#![deny(rust_2018_idioms, nonstandard_style)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]

mod accept;
mod builder;
mod config;
mod server;
mod service;
mod signals;
mod socket;
mod test_server;
mod waker_queue;
mod worker;

pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;

#[doc(hidden)]
pub use self::socket::FromStream;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Socket ID token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);

impl Default for Token {
    fn default() -> Self {
        Self::new()
    }
}

impl Token {
    fn new() -> Self {
        Self(0)
    }

    pub(crate) fn next(&mut self) -> Token {
        let token = Token(self.0);
        self.0 += 1;
        token
    }
}

/// Start server building process
pub fn new() -> ServerBuilder {
    ServerBuilder::default()
}

// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
    fut: Vec<JoinFuture<T>>,
}

pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + 'static>) -> JoinAll<T> {
    let fut = fut
        .into_iter()
        .map(|f| JoinFuture::Future(Box::pin(f)))
        .collect();

    JoinAll { fut }
}

enum JoinFuture<T> {
    Future(Pin<Box<dyn Future<Output = T>>>),
    Result(Option<T>),
}

impl<T> Unpin for JoinAll<T> {}

impl<T> Future for JoinAll<T> {
    type Output = Vec<T>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut ready = true;

        let this = self.get_mut();
        for fut in this.fut.iter_mut() {
            if let JoinFuture::Future(f) = fut {
                match f.as_mut().poll(cx) {
                    Poll::Ready(t) => {
                        *fut = JoinFuture::Result(Some(t));
                    }
                    Poll::Pending => ready = false,
                }
            }
        }

        if ready {
            let mut res = Vec::new();
            for fut in this.fut.iter_mut() {
                if let JoinFuture::Result(f) = fut {
                    res.push(f.take().unwrap());
                }
            }

            Poll::Ready(res)
        } else {
            Poll::Pending
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;

    use actix_utils::future::ready;

    #[actix_rt::test]
    async fn test_join_all() {
        let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
        let mut res = join_all(futs).await.into_iter();
        assert_eq!(Ok(1), res.next().unwrap());
        assert_eq!(Err(3), res.next().unwrap());
        assert_eq!(Ok(9), res.next().unwrap());
    }
}