use crate::pipeline::{error::PipelineError, naming::PluginName};
pub mod any;
mod create;
pub(super) mod introspect;
mod output;
mod source;
mod transform;
pub use create::{create_many, create_one, CreationRequest, MultiCreationRequestBuilder, SingleCreationRequestBuilder};
pub use introspect::{list_elements, ElementListFilter, IntrospectionRequest};
pub use output::{output, OutputRequest, OutputRequestBuilder, RemainingDataStrategy};
pub use source::{source, SourceRequest, SourceRequestBuilder};
use tokio::sync::oneshot;
pub use transform::{transform, TransformRequest, TransformRequestBuilder};
use super::messages;
pub(super) trait AnonymousControlRequest {
type OkResponse;
type Receiver: ResponseReceiver<Ok = Self::OkResponse>;
fn serialize(self) -> messages::ControlRequest;
fn serialize_with_response(self) -> (messages::ControlRequest, Self::Receiver);
}
pub(super) trait PluginControlRequest {
type OkResponse;
type Receiver: ResponseReceiver<Ok = Self::OkResponse>;
fn serialize(self, plugin: &PluginName) -> messages::ControlRequest;
fn serialize_with_response(self, plugin: &PluginName) -> (messages::ControlRequest, Self::Receiver);
}
impl<R: AnonymousControlRequest> PluginControlRequest for R {
type OkResponse = R::OkResponse;
type Receiver = R::Receiver;
fn serialize(self, _plugin: &PluginName) -> messages::ControlRequest {
AnonymousControlRequest::serialize(self)
}
fn serialize_with_response(self, _plugin: &PluginName) -> (messages::ControlRequest, Self::Receiver) {
AnonymousControlRequest::serialize_with_response(self)
}
}
pub(super) trait ResponseReceiver {
type Ok;
async fn recv(self) -> Result<Result<Self::Ok, PipelineError>, RecvError>;
}
pub(super) struct RecvError;
pub(super) struct DirectResponseReceiver<R>(oneshot::Receiver<Result<R, PipelineError>>);
impl<R> ResponseReceiver for DirectResponseReceiver<R> {
type Ok = R;
async fn recv(self) -> Result<Result<Self::Ok, PipelineError>, RecvError> {
self.0.await.map_err(|_| RecvError)
}
}
impl<R> From<oneshot::Receiver<Result<R, PipelineError>>> for DirectResponseReceiver<R> {
fn from(value: oneshot::Receiver<Result<R, PipelineError>>) -> Self {
Self(value)
}
}