1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use kitsune_p2p_types::codec::rmp_decode;
use kitsune_p2p_types::codec::rmp_encode;
use store::Store;
use warp::{hyper::body::Bytes, Filter};
static NOW: AtomicUsize = AtomicUsize::new(0);
static RANDOM: AtomicUsize = AtomicUsize::new(0);
static PUT: AtomicUsize = AtomicUsize::new(0);
mod clear;
mod now;
mod proxy_list;
mod put;
mod random;
mod store;
const SIZE_LIMIT: u64 = 1024;
const PRUNE_EXPIRED_FREQ_S: u64 = 5;
pub type BootstrapDriver = futures::future::BoxFuture<'static, ()>;
pub async fn run(
addr: impl Into<SocketAddr> + 'static,
proxy_list: Vec<String>,
) -> Result<(BootstrapDriver, SocketAddr), String> {
let store = Store::new(proxy_list);
{
let store = store.clone();
tokio::task::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(PRUNE_EXPIRED_FREQ_S)).await;
store.prune();
}
});
}
let boot = now::now()
.or(put::put(store.clone()))
.or(random::random(store.clone()))
.or(proxy_list::proxy_list(store.clone()))
.or(clear::clear(store));
match warp::serve(boot).try_bind_ephemeral(addr) {
Ok((addr, server)) => {
let driver = futures::future::FutureExt::boxed(server);
Ok((driver, addr))
}
Err(e) => Err(format!("Failed to bind socket: {:?}", e)),
}
}
fn with_store(
store: Store,
) -> impl Filter<Extract = (Store,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || store.clone())
}