1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;

use kitsune_p2p_types::codec::rmp_decode;
use kitsune_p2p_types::codec::rmp_encode;
use store::Store;
use warp::{hyper::body::Bytes, Filter};

static NOW: AtomicUsize = AtomicUsize::new(0);
static RANDOM: AtomicUsize = AtomicUsize::new(0);
static PUT: AtomicUsize = AtomicUsize::new(0);

mod clear;
mod now;
mod proxy_list;
mod put;
mod random;
mod store;

/// No reason to accept a peer data bigger then 1KB.
// TODO: Maybe even that's too high?
const SIZE_LIMIT: u64 = 1024;

/// how often should we prune the expired entries?
const PRUNE_EXPIRED_FREQ_S: u64 = 5;

pub type BootstrapDriver = futures::future::BoxFuture<'static, ()>;

pub async fn run(
    addr: impl Into<SocketAddr> + 'static,
    proxy_list: Vec<String>,
) -> Result<(BootstrapDriver, SocketAddr), String> {
    let store = Store::new(proxy_list);
    {
        let store = store.clone();
        tokio::task::spawn(async move {
            loop {
                tokio::time::sleep(std::time::Duration::from_secs(PRUNE_EXPIRED_FREQ_S)).await;
                store.prune();
            }
        });
    }
    let boot = now::now()
        .or(put::put(store.clone()))
        .or(random::random(store.clone()))
        .or(proxy_list::proxy_list(store.clone()))
        .or(clear::clear(store));
    match warp::serve(boot).try_bind_ephemeral(addr) {
        Ok((addr, server)) => {
            let driver = futures::future::FutureExt::boxed(server);
            Ok((driver, addr))
        }
        Err(e) => Err(format!("Failed to bind socket: {:?}", e)),
    }
}

fn with_store(
    store: Store,
) -> impl Filter<Extract = (Store,), Error = std::convert::Infallible> + Clone {
    warp::any().map(move || store.clone())
}