dora-runtime 0.5.0-rc0

`dora` goal is to be a low latency, composable, and distributed data flow.
Documentation
use super::{OperatorEvent, StopReason};
use aligned_vec::{AVec, ConstAlign};
use dora_core::{
    adjust_shared_library_path,
    config::{DataId, NodeId, OperatorId},
    descriptor::source_is_url,
};
use dora_download::download_file;
use dora_node_api::{
    Event, Parameter,
    arrow_utils::{copy_array_into_sample, required_data_size},
};
use dora_operator_api_types::{
    DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, DoraResult, DoraStatus,
    Metadata, OnEventResult, Output, SendOutput, safer_ffi::closure::ArcDynFn1,
};
use eyre::{Context, Result, bail, eyre};
use libloading::Symbol;
use std::{
    collections::BTreeMap,
    ffi::c_void,
    panic::{AssertUnwindSafe, catch_unwind},
    path::Path,
    sync::Arc,
};
use tokio::sync::oneshot;
use tracing::{field, span};

pub fn run(
    _node_id: &NodeId,
    _operator_id: &OperatorId,
    source: &str,
    events_tx: flume::Sender<OperatorEvent>,
    incoming_events: flume::Receiver<Event>,
    init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<()> {
    let path = if source_is_url(source) {
        let target_path = &Path::new("build");
        // try to download the shared library
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;
        rt.block_on(download_file(source, target_path))
            .wrap_err("failed to download shared library operator")?
    } else {
        adjust_shared_library_path(Path::new(source))?
    };

    let library = unsafe {
        libloading::Library::new(&path)
            .wrap_err_with(|| format!("failed to load shared library at `{}`", path.display()))?
    };

    let closure = AssertUnwindSafe(|| {
        let bindings = match Bindings::init(&library) {
            Ok(b) => b,
            Err(err) => {
                let err = err.wrap_err(format!(
                    "failed to init operator bindings from `{}`. \
                        On Windows, ensure that the shared library exports the required symbols \
                        (dora_init_operator, dora_drop_operator, dora_on_event).",
                    path.display()
                ));
                let _ = init_done.send(Err(eyre!("{err:?}")));
                return Err(err);
            }
        };

        let operator = SharedLibraryOperator {
            incoming_events,
            bindings,
            events_tx: events_tx.clone(),
        };

        operator.run(init_done)
    });
    match catch_unwind(closure) {
        Ok(Ok(reason)) => {
            let _ = events_tx.send(OperatorEvent::Finished { reason });
        }
        Ok(Err(err)) => {
            let _ = events_tx.send(OperatorEvent::Error(err));
        }
        Err(panic) => {
            let _ = events_tx.send(OperatorEvent::Panic(panic));
        }
    }

    Ok(())
}

struct SharedLibraryOperator<'lib> {
    incoming_events: flume::Receiver<Event>,
    events_tx: flume::Sender<OperatorEvent>,

    bindings: Bindings<'lib>,
}

impl SharedLibraryOperator<'_> {
    fn run(self, init_done: oneshot::Sender<Result<()>>) -> eyre::Result<StopReason> {
        let operator_context = {
            let DoraInitResult {
                result,
                operator_context,
            } = unsafe { (self.bindings.init_operator.init_operator)() };
            let raw = match result.error {
                Some(error) => {
                    let _ = init_done.send(Err(eyre!(error.to_string())));
                    bail!("init_operator failed: {}", *error)
                }
                None => operator_context,
            };
            OperatorContext {
                raw,
                drop_fn: self.bindings.drop_operator.clone(),
            }
        };

        let _ = init_done.send(Ok(()));

        let send_output_closure = Arc::new(move |output: Output| {
            let Output {
                id: output_id,
                data_array,
                schema,
                metadata: Metadata {
                    open_telemetry_context,
                },
            } = output;
            let mut parameters = BTreeMap::new();
            parameters.insert(
                "open_telemetry_context".to_string(),
                Parameter::String(open_telemetry_context.to_string()),
            );

            let arrow_array = match unsafe { arrow::ffi::from_ffi(data_array, &schema) } {
                Ok(a) => a,
                Err(err) => return DoraResult::from_error(err.to_string()),
            };

            let total_len = required_data_size(&arrow_array);
            let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, total_len);

            let type_info = copy_array_into_sample(&mut sample, &arrow_array);

            let event = OperatorEvent::Output {
                output_id: DataId::from(String::from(output_id)),
                type_info,
                parameters,
                data: Some(sample.into()),
            };

            let result = self
                .events_tx
                .send(event)
                .map_err(|_| eyre!("failed to send output to runtime"));

            match result {
                Ok(()) => DoraResult::SUCCESS,
                Err(_) => DoraResult::from_error("runtime process closed unexpectedly".into()),
            }
        });

        let reason = loop {
            #[allow(unused_mut)]
            let Ok(mut event) = self.incoming_events.recv() else {
                break StopReason::InputsClosed;
            };

            let span = span!(tracing::Level::TRACE, "on_event", input_id = field::Empty);
            let _ = span.enter();
            // Add metadata context if we have a tracer and
            // incoming input has some metadata.
            #[cfg(feature = "telemetry")]
            if let Event::Input {
                id: input_id,
                metadata,
                ..
            } = &mut event
            {
                use dora_tracing::telemetry::{deserialize_context, serialize_context};
                use tracing_opentelemetry::OpenTelemetrySpanExt;
                span.record("input_id", input_id.as_str());

                let otel = metadata.open_telemetry_context();
                let cx = deserialize_context(&otel);
                span.set_parent(cx)
                    .context("failed to set parent span")
                    .unwrap_or_default();
                let cx = span.context();
                let string_cx = serialize_context(&cx);
                metadata.parameters.insert(
                    "open_telemetry_context".to_string(),
                    Parameter::String(string_cx),
                );
            }

            let mut operator_event = match event {
                Event::Stop(_) => dora_operator_api_types::RawEvent {
                    input: None,
                    input_closed: None,
                    stop: true,
                    error: None,
                },
                Event::Input {
                    id: input_id,
                    metadata,
                    data,
                } => {
                    let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?;
                    let otel = metadata.open_telemetry_context();
                    let operator_input = dora_operator_api_types::Input {
                        id: String::from(input_id).into(),
                        data_array: Some(data_array),
                        schema,
                        metadata: Metadata {
                            open_telemetry_context: otel.into(),
                        },
                    };
                    dora_operator_api_types::RawEvent {
                        input: Some(Box::new(operator_input).into()),
                        input_closed: None,
                        stop: false,
                        error: None,
                    }
                }
                Event::InputClosed { id: input_id } => dora_operator_api_types::RawEvent {
                    input_closed: Some(input_id.to_string().into()),
                    input: None,
                    stop: false,
                    error: None,
                },
                Event::Reload { .. } => {
                    // Reloading shared lib operator is not supported. See: https://github.com/dora-rs/dora/pull/239#discussion_r1154313139
                    continue;
                }
                Event::Error(err) => dora_operator_api_types::RawEvent {
                    error: Some(err.into()),
                    input_closed: None,
                    input: None,
                    stop: false,
                },
                other => {
                    tracing::warn!("unexpected event: {other:?}");
                    continue;
                }
            };

            let send_output = SendOutput {
                send_output: ArcDynFn1::new(send_output_closure.clone()),
            };
            let OnEventResult {
                result: DoraResult { error },
                status,
            } = unsafe {
                (self.bindings.on_event.on_event)(
                    &mut operator_event,
                    &send_output,
                    operator_context.raw,
                )
            };
            match error {
                Some(error) => bail!("on_input failed: {}", *error),
                None => match status {
                    DoraStatus::Continue => {}
                    DoraStatus::Stop => break StopReason::ExplicitStop,
                    DoraStatus::StopAll => break StopReason::ExplicitStopAll,
                },
            }
        };
        Ok(reason)
    }
}

struct OperatorContext<'lib> {
    raw: *mut c_void,
    drop_fn: Symbol<'lib, DoraDropOperator>,
}

impl Drop for OperatorContext<'_> {
    fn drop(&mut self) {
        unsafe { (self.drop_fn.drop_operator)(self.raw) };
    }
}

struct Bindings<'lib> {
    init_operator: Symbol<'lib, DoraInitOperator>,
    drop_operator: Symbol<'lib, DoraDropOperator>,
    on_event: Symbol<'lib, DoraOnEvent>,
}

impl<'lib> Bindings<'lib> {
    fn init(library: &'lib libloading::Library) -> Result<Self, eyre::Error> {
        let bindings = unsafe {
            Bindings {
                init_operator: library
                    .get(b"dora_init_operator")
                    .wrap_err("failed to get `dora_init_operator`")?,
                drop_operator: library
                    .get(b"dora_drop_operator")
                    .wrap_err("failed to get `dora_drop_operator`")?,
                on_event: library
                    .get(b"dora_on_event")
                    .wrap_err("failed to get `dora_on_event`")?,
            }
        };
        Ok(bindings)
    }
}