use std::fmt::{Debug, Display};
use std::iter;
use std::sync::Arc;
use failure::Error;
use futures::future::{self, Future, IntoFuture};
use futures::sync::{mpsc, oneshot};
use futures::Stream;
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use spirit::helpers::Helper;
use spirit::validation::{Result as ValidationResult, Results as ValidationResults};
use spirit::{Builder, Spirit};
use structopt::StructOpt;
use tk_listen::ListenExt;
use base_traits::{Name, ResourceConfig, ResourceConsumer};
use net::{IntoIncoming, ListenLimits};
use runtime::Runtime;
struct RemoteDrop {
request_drop: Option<oneshot::Sender<()>>,
drop_confirmed: Option<oneshot::Receiver<()>>,
}
impl Drop for RemoteDrop {
fn drop(&mut self) {
trace!("Requesting remote drop");
let _ = self.request_drop.take().unwrap().send(());
let _ = self.drop_confirmed.take().unwrap().wait();
trace!("Remote drop done");
}
}
pub fn per_connection_init<Config, I, F, R, O, C, Ctx>(
init: I,
action: F,
) -> impl ResourceConsumer<Config, O, C>
where
Config: ListenLimits + ResourceConfig<O, C>,
Config::Resource: IntoIncoming,
I: Fn(&Arc<Spirit<O, C>>, &Arc<Config>, &mut Config::Resource, &str) -> Ctx
+ Send
+ Sync
+ 'static,
F: Fn(
&Arc<Spirit<O, C>>,
&Arc<Config>,
&mut Ctx,
<Config::Resource as IntoIncoming>::Connection,
&str,
) -> R
+ Sync
+ Send
+ 'static,
R: IntoFuture<Item = ()>,
R::Error: Display,
R::Future: Sync + Send + 'static,
O: Send + Sync + 'static,
C: Send + Sync + 'static,
Ctx: Send + Sync + 'static,
{
let action = Arc::new(action);
move |spirit: &_, config: &Arc<Config>, mut listener: Config::Resource, name: &str| {
let mut ctx = init(spirit, config, &mut listener, &name);
let spirit = Arc::clone(spirit);
let config = Arc::clone(config);
let max_conn = config.max_conn();
let action = Arc::clone(&action);
let name: Arc<str> = Arc::from(name.to_owned());
listener
.into_incoming()
.sleep_on_error(config.error_sleep())
.map(move |new_conn| {
let (done_send, done_recv) = oneshot::channel();
let name_err = Arc::clone(&name);
let handle_conn = action(&spirit, &config, &mut ctx, new_conn, &name)
.into_future()
.then(move |r| {
if let Err(e) = r {
error!("Failed to handle connection on {:?}: {}", name_err, e);
}
let _ = done_send.send(());
future::ok(())
});
tokio::spawn(handle_conn);
done_recv.then(|_| future::ok(()))
})
.listen(max_conn)
.map_err(|()| -> Error {
unreachable!("tk-listen never errors");
})
}
}
pub fn per_connection<Config, F, R, O, C>(action: F) -> impl ResourceConsumer<Config, O, C>
where
Config: ListenLimits + ResourceConfig<O, C>,
Config::Resource: IntoIncoming,
F: Fn(
&Arc<Spirit<O, C>>,
&Arc<Config>,
<Config::Resource as IntoIncoming>::Connection,
&str,
) -> R
+ Sync
+ Send
+ 'static,
R: IntoFuture<Item = ()>,
R::Error: Display,
R::Future: Sync + Send + 'static,
O: Send + Sync + 'static,
C: Send + Sync + 'static,
{
per_connection_init(
|_: &_, _: &_, _: &mut _, _: &str| (),
move |spirit: &_, config: &_, _: &mut (), connection, name: &str| {
action(spirit, config, connection, name)
},
)
}
pub fn resources<Config, Consumer, E, R, O, C, N>(
mut extract: E,
consumer: Consumer,
name: N,
) -> impl Helper<O, C>
where
Config: ResourceConfig<O, C>,
Consumer: ResourceConsumer<Config, O, C>,
C: DeserializeOwned + Send + Sync + 'static,
O: Debug + StructOpt + Sync + Send + 'static,
E: FnMut(&C) -> R + Send + 'static,
R: IntoIterator<Item = Config>,
N: Name,
{
struct Install<O, C, Config: ResourceConfig<O, C>> {
resource: Config::Resource,
drop_req: oneshot::Receiver<()>,
confirm_drop: oneshot::Sender<()>,
config: Arc<Config>,
}
let (install_sender, install_receiver) = mpsc::unbounded::<Install<_, _, Config>>();
let name_validator = name.clone();
let name_builder = name.clone();
let installer = move |spirit: &Arc<Spirit<O, C>>| {
let spirit = Arc::clone(spirit);
install_receiver.for_each(move |install| {
let Install {
resource,
drop_req,
confirm_drop,
config,
} = install;
let cfg_str: Arc<str> = Arc::from(format!("{:?}", config));
let cfg_str_err = Arc::clone(&cfg_str);
let name = name.clone();
let name_err = name.clone();
debug!(
"Installing resource {} with config {}",
name.as_ref(),
cfg_str
);
let task = consumer
.build_future(&spirit, &config, resource, name.as_ref())
.map_err(move |e| {
error!(
"Task {} on config {} failed: {}",
name_err.as_ref(),
cfg_str_err,
e
);
})
.select(drop_req.map_err(|_| ())) .then(move |orig| {
debug!("Terminated resource {} on cfg {}", name.as_ref(), cfg_str);
drop(orig); confirm_drop.send(())
})
.map_err(|_| ());
tokio::spawn(task)
})
};
struct CacheEntry<O, C, Config: ResourceConfig<O, C>> {
config: Arc<Config>,
seed: Arc<Config::Seed>,
remote: Vec<Arc<RemoteDrop>>,
}
impl<O, C, Config: ResourceConfig<O, C>> Clone for CacheEntry<O, C, Config> {
fn clone(&self) -> Self {
CacheEntry {
config: Arc::clone(&self.config),
seed: Arc::clone(&self.seed),
remote: self.remote.clone(),
}
}
}
let cache = Arc::new(Mutex::new(Vec::<CacheEntry<O, C, Config>>::new()));
let validator = move |_: &Arc<C>, cfg: &mut C, _: &O| -> ValidationResults {
let mut results = ValidationResults::new();
let orig_cache = cache.lock();
let mut new_cache = Vec::new();
let mut to_send = Vec::new();
for cfg in extract(cfg) {
let cfg = Arc::new(cfg);
let previous = orig_cache
.iter()
.find(|orig| cfg.is_similar(&orig.config, name_validator.as_ref()))
.map(CacheEntry::clone);
let mut cache = if let Some(prev) = previous {
prev
} else {
let seed = match cfg.create(name_validator.as_ref()) {
Ok(seed) => Arc::new(seed),
Err(e) => {
results.merge(ValidationResult::from(e));
continue;
}
};
CacheEntry {
config: Arc::clone(&cfg),
seed,
remote: Vec::new(),
}
};
if cache.config != cfg {
cache.remote.clear();
}
let (scale, scale_validation) = cfg.scaled(name_validator.as_ref());
results.merge(scale_validation);
assert!(scale >= cache.remote.len());
for _ in 0..scale - cache.remote.len() {
let resource = match cfg.fork(&cache.seed, name_validator.as_ref()) {
Ok(resource) => resource,
Err(e) => {
results.merge(ValidationResult::from(e));
continue;
}
};
let (drop_send, drop_recv) = oneshot::channel();
let (confirm_send, confirm_recv) = oneshot::channel();
cache.remote.push(Arc::new(RemoteDrop {
request_drop: Some(drop_send),
drop_confirmed: Some(confirm_recv),
}));
to_send.push(Install {
config: Arc::clone(&cfg),
confirm_drop: confirm_send,
drop_req: drop_recv,
resource,
});
}
cache.config = cfg;
new_cache.push(cache);
}
let cache = Arc::clone(&cache);
let name = name_validator.clone();
let sender = install_sender.clone();
results.merge(ValidationResult::nothing().on_success(move || {
for install in to_send {
trace!(
"Sending {}/{:?} to the reactor",
name.as_ref(),
install.config
);
sender
.unbounded_send(install)
.expect("The tokio end got dropped");
}
*cache.lock() = new_cache;
debug!("New version of {} sent", name.as_ref());
}));
results
};
move |builder: Builder<O, C>| {
let builder = Config::install(builder, &name_builder);
let builder = Consumer::install(builder, &name_builder);
builder
.config_validator(validator)
.with_singleton(Runtime::default())
.before_body(move |spirit| {
tokio::spawn(installer(spirit));
Ok(())
})
}
}
pub fn resource<Config, Consumer, E, O, C, N>(
mut extract: E,
consumer: Consumer,
name: N,
) -> impl Helper<O, C>
where
Config: ResourceConfig<O, C>,
Consumer: ResourceConsumer<Config, O, C>,
C: DeserializeOwned + Send + Sync + 'static,
O: Debug + StructOpt + Sync + Send + 'static,
E: FnMut(&C) -> Config + Send + 'static,
N: Name,
{
resources(move |c: &C| iter::once(extract(c)), consumer, name)
}