use std::any::Any;
use std::time::Duration;
use arrow_schema::Field;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::ColumnarValue;
use datafusion::scalar::ScalarValue;
use smol_str::SmolStr;
use crate::capability::SideEffects;
use crate::errors::FnError;
use crate::traits::connector::Principal;
use crate::traits::scalar::ArgType;
pub trait ProcedurePlugin: Send + Sync {
fn signature(&self) -> &ProcedureSignature;
fn invoke(
&self,
ctx: ProcedureContext<'_>,
args: &[ColumnarValue],
) -> Result<SendableRecordBatchStream, FnError>;
}
#[derive(Clone, Debug)]
pub struct ProcedureSignature {
pub args: Vec<NamedArgType>,
pub yields: Vec<Field>,
pub mode: ProcedureMode,
pub side_effects: SideEffects,
pub retry_contract: Option<RetryContract>,
pub batch_input: Option<BatchInputShape>,
pub docs: String,
}
#[derive(Clone, Debug)]
pub struct NamedArgType {
pub name: SmolStr,
pub ty: ArgType,
pub default: Option<ScalarValue>,
pub doc: String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum ProcedureMode {
Read,
Write,
Schema,
Dbms,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum RetryContract {
Atomic {
max_retries: u32,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum BatchInputShape {
Rows,
}
pub trait ProcedureHost: Send + Sync + Any {
fn as_any(&self) -> &dyn Any;
}
#[derive(Default)]
#[non_exhaustive]
pub struct ProcedureContext<'a> {
pub host: Option<&'a dyn ProcedureHost>,
pub deadline: Option<Duration>,
pub principal: Option<&'a Principal>,
pub _marker: std::marker::PhantomData<&'a ()>,
}
impl<'a> ProcedureContext<'a> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_host(mut self, host: &'a dyn ProcedureHost) -> Self {
self.host = Some(host);
self
}
#[must_use]
pub fn with_deadline(mut self, deadline: Duration) -> Self {
self.deadline = Some(deadline);
self
}
#[must_use]
pub fn with_principal(mut self, principal: &'a Principal) -> Self {
self.principal = Some(principal);
self
}
}
impl std::fmt::Debug for ProcedureContext<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProcedureContext")
.field("host", &self.host.map(|_| "<host>"))
.field("deadline", &self.deadline)
.field("principal", &self.principal)
.finish()
}
}