use std::sync::mpsc::channel;
use std::thread;
use crate::artifact::{Artifact, ArtifactCreator, ArtifactCreatorFactory};
use crate::error::InternalError;
use super::message::PublishMessage;
use super::{
Batch, BatchExecutionResult, BatchVerifierFactory, PendingBatches, PublishHandle,
PublishingContext,
};
#[allow(clippy::type_complexity)]
pub struct PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
artifact_creator_factory: Box<
dyn ArtifactCreatorFactory<
ArtifactCreator = Box<dyn ArtifactCreator<Context = C, Input = Vec<I>, Artifact = R>>,
>,
>,
batch_verifier_factory:
Box<dyn BatchVerifierFactory<Batch = B, Context = C, ExecutionResult = I>>,
}
#[allow(clippy::type_complexity)]
impl<B, C, R, I> PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
pub fn new(
artifact_creator_factory: Box<
dyn ArtifactCreatorFactory<
ArtifactCreator = Box<
dyn ArtifactCreator<Context = C, Input = Vec<I>, Artifact = R>,
>,
>,
>,
batch_verifier_factory: Box<
dyn BatchVerifierFactory<Batch = B, Context = C, ExecutionResult = I>,
>,
) -> Self {
Self {
artifact_creator_factory,
batch_verifier_factory,
}
}
}
impl<B, C, R, I> PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
pub fn start(
&mut self,
mut context: C,
mut batches: Box<dyn PendingBatches<B>>,
) -> Result<PublishHandle<R>, InternalError> {
let (sender, rc) = channel();
let mut verifier = self.batch_verifier_factory.start(context.clone())?;
let artifact_creator = self.artifact_creator_factory.new_creator()?;
let join_handle = thread::spawn(move || loop {
while let Some(batch) = batches.next().map_err(|err| err.reduce_to_string())? {
verifier
.add_batch(batch)
.map_err(|err| err.reduce_to_string())?;
}
match rc.recv() {
Ok(PublishMessage::Cancel) => {
verifier.cancel().map_err(|err| err.reduce_to_string())?;
return Ok(None);
}
Ok(PublishMessage::Finish) => {
let results = verifier.finalize().map_err(|err| err.reduce_to_string())?;
return Ok(Some(
artifact_creator
.create(&mut context, results)
.map_err(|err| err.reduce_to_string())?,
));
}
Ok(PublishMessage::Dropped) => {
return Ok(None);
}
Ok(PublishMessage::Next) => {
while let Some(batch) = batches.next().map_err(|err| err.reduce_to_string())? {
verifier
.add_batch(batch)
.map_err(|err| err.reduce_to_string())?;
}
}
Err(err) => {
return Err(InternalError::from_source(Box::new(err)).reduce_to_string());
}
};
});
Ok(PublishHandle::new(sender, join_handle))
}
}