use std::collections::BTreeMap;
use std::fmt;
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use serde_json::value::RawValue;
use crate::protocol::{
DataRow, Diagnostic, StreamSummary, WorkerCapabilityFact, WorkerCapabilityMetadata, WorkerCommandMetadata,
WorkerDiagnostic, WorkerDoctorDiagnostic, WorkerDoctorReport, WorkerDoctorSeverity, WorkerElabOptions,
WorkerElabOutcome, WorkerKernelOutcome, WorkerKernelStatus,
};
use crate::supervisor::{LeanWorker, LeanWorkerError};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerSessionConfig {
project_root: PathBuf,
package: String,
lib_name: String,
imports: Vec<String>,
}
impl LeanWorkerSessionConfig {
pub fn new(
project_root: impl Into<PathBuf>,
package: impl Into<String>,
lib_name: impl Into<String>,
imports: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
Self {
project_root: project_root.into(),
package: package.into(),
lib_name: lib_name.into(),
imports: imports.into_iter().map(Into::into).collect(),
}
}
pub(crate) fn project_root_string(&self) -> String {
self.project_root.to_string_lossy().into_owned()
}
pub(crate) fn package(&self) -> &str {
&self.package
}
pub(crate) fn lib_name(&self) -> &str {
&self.lib_name
}
pub(crate) fn imports(&self) -> &[String] {
&self.imports
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerElabOptions {
namespace_context: String,
file_label: String,
heartbeat_limit: u64,
diagnostic_byte_limit: usize,
}
impl LeanWorkerElabOptions {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn namespace_context(mut self, namespace: &str) -> Self {
namespace.clone_into(&mut self.namespace_context);
self
}
#[must_use]
pub fn file_label(mut self, label: &str) -> Self {
label.clone_into(&mut self.file_label);
self
}
#[must_use]
pub fn heartbeat_limit(mut self, heartbeats: u64) -> Self {
self.heartbeat_limit = heartbeats;
self
}
#[must_use]
pub fn diagnostic_byte_limit(mut self, bytes: usize) -> Self {
self.diagnostic_byte_limit = bytes;
self
}
pub(crate) fn wire(&self) -> WorkerElabOptions {
WorkerElabOptions {
namespace_context: self.namespace_context.clone(),
file_label: self.file_label.clone(),
heartbeat_limit: self.heartbeat_limit,
diagnostic_byte_limit: self.diagnostic_byte_limit,
}
}
}
impl Default for LeanWorkerElabOptions {
fn default() -> Self {
Self {
namespace_context: String::new(),
file_label: "<elaborate>".to_owned(),
heartbeat_limit: lean_rs_host::LEAN_HEARTBEAT_LIMIT_DEFAULT,
diagnostic_byte_limit: lean_rs_host::LEAN_DIAGNOSTIC_BYTE_LIMIT_DEFAULT,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerRuntimeMetadata {
pub worker_version: String,
pub protocol_version: u16,
pub lean_version: Option<String>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerCapabilityMetadata {
pub commands: Vec<LeanWorkerCommandMetadata>,
pub capabilities: Vec<LeanWorkerCapabilityFact>,
pub lean_version: Option<String>,
pub extra: Option<Value>,
}
impl From<WorkerCapabilityMetadata> for LeanWorkerCapabilityMetadata {
fn from(value: WorkerCapabilityMetadata) -> Self {
Self {
commands: value.commands.into_iter().map(Into::into).collect(),
capabilities: value.capabilities.into_iter().map(Into::into).collect(),
lean_version: value.lean_version,
extra: value.extra,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerCommandMetadata {
pub name: String,
pub version: String,
}
impl From<WorkerCommandMetadata> for LeanWorkerCommandMetadata {
fn from(value: WorkerCommandMetadata) -> Self {
Self {
name: value.name,
version: value.version,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerCapabilityFact {
pub name: String,
pub version: String,
}
impl From<WorkerCapabilityFact> for LeanWorkerCapabilityFact {
fn from(value: WorkerCapabilityFact) -> Self {
Self {
name: value.name,
version: value.version,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum LeanWorkerDoctorSeverity {
Pass,
Warning,
Error,
}
impl From<WorkerDoctorSeverity> for LeanWorkerDoctorSeverity {
fn from(value: WorkerDoctorSeverity) -> Self {
match value {
WorkerDoctorSeverity::Pass => Self::Pass,
WorkerDoctorSeverity::Warning => Self::Warning,
WorkerDoctorSeverity::Error => Self::Error,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerDoctorDiagnostic {
pub severity: LeanWorkerDoctorSeverity,
pub code: String,
pub message: String,
pub details: Option<Value>,
}
impl From<WorkerDoctorDiagnostic> for LeanWorkerDoctorDiagnostic {
fn from(value: WorkerDoctorDiagnostic) -> Self {
Self {
severity: value.severity.into(),
code: value.code,
message: value.message,
details: value.details,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerDoctorReport {
pub diagnostics: Vec<LeanWorkerDoctorDiagnostic>,
pub metadata: Option<Value>,
}
impl From<WorkerDoctorReport> for LeanWorkerDoctorReport {
fn from(value: WorkerDoctorReport) -> Self {
Self {
diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
metadata: value.metadata,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct LeanWorkerCancellationToken {
cancelled: Arc<AtomicBool>,
}
impl LeanWorkerCancellationToken {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerProgressEvent {
pub phase: String,
pub current: u64,
pub total: Option<u64>,
pub elapsed: Duration,
}
pub trait LeanWorkerProgressSink: Send + Sync {
fn report(&self, event: LeanWorkerProgressEvent);
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerDataRow {
pub stream: String,
pub sequence: u64,
pub payload: Value,
}
impl TryFrom<DataRow> for LeanWorkerDataRow {
type Error = LeanWorkerError;
fn try_from(value: DataRow) -> Result<Self, Self::Error> {
let payload = serde_json::from_str(value.payload.get()).map_err(|err| LeanWorkerError::Protocol {
message: format!("worker data-row payload decode failed: {err}"),
})?;
Ok(Self {
stream: value.stream,
sequence: value.sequence,
payload,
})
}
}
pub trait LeanWorkerDataSink: Send + Sync {
fn report(&self, row: LeanWorkerDataRow);
}
pub(crate) struct LeanWorkerRawDataRow {
pub(crate) stream: String,
pub(crate) sequence: u64,
pub(crate) payload: Box<RawValue>,
}
impl From<DataRow> for LeanWorkerRawDataRow {
fn from(value: DataRow) -> Self {
Self {
stream: value.stream,
sequence: value.sequence,
payload: value.payload,
}
}
}
pub(crate) trait LeanWorkerRawDataSink: Send + Sync {
fn report(&self, row: LeanWorkerRawDataRow);
}
#[derive(Clone, Copy)]
pub(crate) enum LeanWorkerDataSinkTarget<'a> {
Value(&'a dyn LeanWorkerDataSink),
Raw(&'a dyn LeanWorkerRawDataSink),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerDiagnosticEvent {
pub code: String,
pub message: String,
}
impl From<Diagnostic> for LeanWorkerDiagnosticEvent {
fn from(value: Diagnostic) -> Self {
Self {
code: value.code,
message: value.message,
}
}
}
pub trait LeanWorkerDiagnosticSink: Send + Sync {
fn report(&self, diagnostic: LeanWorkerDiagnosticEvent);
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerStreamSummary {
pub total_rows: u64,
pub per_stream_counts: BTreeMap<String, u64>,
pub elapsed: Duration,
pub metadata: Option<Value>,
}
impl From<StreamSummary> for LeanWorkerStreamSummary {
fn from(value: StreamSummary) -> Self {
Self {
total_rows: value.total_rows,
per_stream_counts: value.per_stream_counts,
elapsed: Duration::from_micros(value.elapsed_micros),
metadata: value.metadata,
}
}
}
pub struct LeanWorkerJsonCommand<Req, Resp> {
export: String,
_types: PhantomData<fn(&Req) -> Resp>,
}
impl<Req, Resp> LeanWorkerJsonCommand<Req, Resp> {
#[must_use]
pub fn new(export: impl Into<String>) -> Self {
Self {
export: export.into(),
_types: PhantomData,
}
}
#[must_use]
pub fn export(&self) -> &str {
&self.export
}
}
impl<Req, Resp> Clone for LeanWorkerJsonCommand<Req, Resp> {
fn clone(&self) -> Self {
Self {
export: self.export.clone(),
_types: PhantomData,
}
}
}
impl<Req, Resp> fmt::Debug for LeanWorkerJsonCommand<Req, Resp> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LeanWorkerJsonCommand")
.field("export", &self.export)
.finish()
}
}
impl<Req, Resp> PartialEq for LeanWorkerJsonCommand<Req, Resp> {
fn eq(&self, other: &Self) -> bool {
self.export == other.export
}
}
impl<Req, Resp> Eq for LeanWorkerJsonCommand<Req, Resp> {}
pub struct LeanWorkerStreamingCommand<Req, Row, Summary> {
export: String,
_types: PhantomData<fn(&Req) -> (Row, Summary)>,
}
impl<Req, Row, Summary> LeanWorkerStreamingCommand<Req, Row, Summary> {
#[must_use]
pub fn new(export: impl Into<String>) -> Self {
Self {
export: export.into(),
_types: PhantomData,
}
}
#[must_use]
pub fn export(&self) -> &str {
&self.export
}
}
impl<Req, Row, Summary> Clone for LeanWorkerStreamingCommand<Req, Row, Summary> {
fn clone(&self) -> Self {
Self {
export: self.export.clone(),
_types: PhantomData,
}
}
}
impl<Req, Row, Summary> fmt::Debug for LeanWorkerStreamingCommand<Req, Row, Summary> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LeanWorkerStreamingCommand")
.field("export", &self.export)
.finish()
}
}
impl<Req, Row, Summary> PartialEq for LeanWorkerStreamingCommand<Req, Row, Summary> {
fn eq(&self, other: &Self) -> bool {
self.export == other.export
}
}
impl<Req, Row, Summary> Eq for LeanWorkerStreamingCommand<Req, Row, Summary> {}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerTypedDataRow<Row> {
pub stream: String,
pub sequence: u64,
pub payload: Row,
}
pub trait LeanWorkerTypedDataSink<Row>: Send + Sync {
fn report(&self, row: LeanWorkerTypedDataRow<Row>);
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerTypedStreamSummary<Summary> {
pub total_rows: u64,
pub per_stream_counts: BTreeMap<String, u64>,
pub elapsed: Duration,
pub metadata: Option<Summary>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerElabResult {
pub success: bool,
pub diagnostics: Vec<LeanWorkerDiagnostic>,
pub truncated: bool,
}
impl From<WorkerElabOutcome> for LeanWorkerElabResult {
fn from(value: WorkerElabOutcome) -> Self {
Self {
success: value.success,
diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
truncated: value.truncated,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum LeanWorkerKernelStatus {
Checked,
Rejected,
Unavailable,
Unsupported,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerKernelResult {
pub status: LeanWorkerKernelStatus,
pub diagnostics: Vec<LeanWorkerDiagnostic>,
pub truncated: bool,
}
impl From<WorkerKernelOutcome> for LeanWorkerKernelResult {
fn from(value: WorkerKernelOutcome) -> Self {
Self {
status: match value.status {
WorkerKernelStatus::Checked => LeanWorkerKernelStatus::Checked,
WorkerKernelStatus::Rejected => LeanWorkerKernelStatus::Rejected,
WorkerKernelStatus::Unavailable => LeanWorkerKernelStatus::Unavailable,
WorkerKernelStatus::Unsupported => LeanWorkerKernelStatus::Unsupported,
},
diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
truncated: value.truncated,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LeanWorkerDiagnostic {
pub severity: String,
pub message: String,
pub file_label: String,
pub line: Option<u32>,
pub column: Option<u32>,
pub end_line: Option<u32>,
pub end_column: Option<u32>,
}
impl From<WorkerDiagnostic> for LeanWorkerDiagnostic {
fn from(value: WorkerDiagnostic) -> Self {
Self {
severity: value.severity,
message: value.message,
file_label: value.file_label,
line: value.line,
column: value.column,
end_line: value.end_line,
end_column: value.end_column,
}
}
}
pub struct LeanWorkerSession<'worker> {
worker: &'worker mut LeanWorker,
open: bool,
}
impl LeanWorker {
pub fn open_session<'worker>(
&'worker mut self,
config: &LeanWorkerSessionConfig,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerSession<'worker>, LeanWorkerError> {
self.open_worker_session(config, cancellation, progress)?;
Ok(LeanWorkerSession {
worker: self,
open: true,
})
}
}
impl LeanWorkerSession<'_> {
#[must_use]
pub fn request_timeout(&self) -> Duration {
self.worker.request_timeout()
}
pub fn set_request_timeout(&mut self, timeout: Duration) {
self.worker.set_request_timeout(timeout);
}
pub fn elaborate(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerElabResult, LeanWorkerError> {
self.ensure_open()?;
match self.worker.worker_elaborate(source, options, cancellation, progress) {
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn kernel_check(
&mut self,
source: &str,
options: &LeanWorkerElabOptions,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
self.ensure_open()?;
match self.worker.worker_kernel_check(source, options, cancellation, progress) {
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn declaration_kinds(
&mut self,
names: &[&str],
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<Vec<String>, LeanWorkerError> {
self.ensure_open()?;
match self.worker.worker_declaration_kinds(names, cancellation, progress) {
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn declaration_names(
&mut self,
names: &[&str],
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<Vec<String>, LeanWorkerError> {
self.ensure_open()?;
match self.worker.worker_declaration_names(names, cancellation, progress) {
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn run_data_stream(
&mut self,
export: &str,
request: &Value,
rows: &dyn LeanWorkerDataSink,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
self.ensure_open()?;
match self
.worker
.worker_run_data_stream(export, request, rows, diagnostics, cancellation, progress)
{
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
fn run_data_stream_raw(
&mut self,
export: &str,
request: &Value,
rows: &dyn LeanWorkerRawDataSink,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
self.ensure_open()?;
match self
.worker
.worker_run_data_stream_raw(export, request, rows, diagnostics, cancellation, progress)
{
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn run_json_command<Req, Resp>(
&mut self,
command: &LeanWorkerJsonCommand<Req, Resp>,
request: &Req,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<Resp, LeanWorkerError>
where
Req: Serialize,
Resp: DeserializeOwned,
{
self.ensure_open()?;
let request_json =
serde_json::to_string(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
export: command.export().to_owned(),
message: err.to_string(),
})?;
match self
.worker
.worker_json_command(command.export(), request_json, cancellation, progress)
{
Ok(response_json) => {
serde_json::from_str(&response_json).map_err(|err| LeanWorkerError::TypedCommandResponseDecode {
export: command.export().to_owned(),
message: err.to_string(),
})
}
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn run_streaming_command<Req, Row, Summary>(
&mut self,
command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
request: &Req,
rows: &dyn LeanWorkerTypedDataSink<Row>,
diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
where
Req: Serialize,
Row: DeserializeOwned,
Summary: DeserializeOwned,
{
self.ensure_open()?;
let request_value =
serde_json::to_value(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
export: command.export().to_owned(),
message: err.to_string(),
})?;
let internal_cancellation = LeanWorkerCancellationToken::new();
let cancellation_for_stream = cancellation.unwrap_or(&internal_cancellation);
let typed_sink = TypedRawDataSink {
export: command.export(),
rows,
cancellation: cancellation_for_stream,
decode_error: std::sync::Mutex::new(None),
};
match self.run_data_stream_raw(
command.export(),
&request_value,
&typed_sink,
diagnostics,
Some(cancellation_for_stream),
progress,
) {
Ok(summary) => {
if let Some(err) = typed_sink.take_decode_error() {
return Err(err);
}
let metadata = summary
.metadata
.map(|metadata| {
serde_json::from_value(metadata).map_err(|err| LeanWorkerError::TypedCommandSummaryDecode {
export: command.export().to_owned(),
message: err.to_string(),
})
})
.transpose()?;
Ok(LeanWorkerTypedStreamSummary {
total_rows: summary.total_rows,
per_stream_counts: summary.per_stream_counts,
elapsed: summary.elapsed,
metadata,
})
}
Err(LeanWorkerError::Cancelled { .. }) => {
if let Some(err) = typed_sink.take_decode_error() {
Err(err)
} else {
self.open = false;
Err(LeanWorkerError::Cancelled {
operation: "worker_run_data_stream",
})
}
}
Err(err) => Err(err),
}
}
pub fn capability_metadata(
&mut self,
export: &str,
request: &Value,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
self.ensure_open()?;
match self
.worker
.worker_capability_metadata(export, request, cancellation, progress)
{
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
pub fn capability_doctor(
&mut self,
export: &str,
request: &Value,
cancellation: Option<&LeanWorkerCancellationToken>,
progress: Option<&dyn LeanWorkerProgressSink>,
) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
self.ensure_open()?;
match self
.worker
.worker_capability_doctor(export, request, cancellation, progress)
{
Ok(value) => Ok(value),
Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
self.open = false;
Err(err)
}
Err(err) => Err(err),
}
}
fn ensure_open(&self) -> Result<(), LeanWorkerError> {
if self.open {
Ok(())
} else {
Err(LeanWorkerError::UnsupportedRequest {
operation: "worker_session_invalidated",
})
}
}
}
struct TypedRawDataSink<'a, Row> {
export: &'a str,
rows: &'a dyn LeanWorkerTypedDataSink<Row>,
cancellation: &'a LeanWorkerCancellationToken,
decode_error: std::sync::Mutex<Option<LeanWorkerError>>,
}
impl<Row> TypedRawDataSink<'_, Row> {
fn take_decode_error(&self) -> Option<LeanWorkerError> {
self.decode_error.lock().ok().and_then(|mut guard| guard.take())
}
}
impl<Row> LeanWorkerRawDataSink for TypedRawDataSink<'_, Row>
where
Row: DeserializeOwned,
{
fn report(&self, row: LeanWorkerRawDataRow) {
match serde_json::from_str(row.payload.get()) {
Ok(payload) => self.rows.report(LeanWorkerTypedDataRow {
stream: row.stream,
sequence: row.sequence,
payload,
}),
Err(err) => {
if let Ok(mut guard) = self.decode_error.lock() {
*guard = Some(LeanWorkerError::TypedCommandRowDecode {
export: self.export.to_owned(),
stream: row.stream,
sequence: row.sequence,
message: err.to_string(),
});
}
self.cancellation.cancel();
}
}
}
}
pub(crate) fn check_cancelled(
operation: &'static str,
token: Option<&LeanWorkerCancellationToken>,
) -> Result<(), LeanWorkerError> {
if token.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
Err(LeanWorkerError::Cancelled { operation })
} else {
Ok(())
}
}
pub(crate) fn report_parent_progress(
sink: Option<&dyn LeanWorkerProgressSink>,
event: LeanWorkerProgressEvent,
) -> Result<(), LeanWorkerError> {
let Some(sink) = sink else {
return Ok(());
};
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(event))).map_err(|payload| {
let message = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_owned()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"worker progress sink panicked".to_owned()
};
LeanWorkerError::ProgressPanic { message }
})
}
pub(crate) fn report_parent_data_row(
sink: Option<LeanWorkerDataSinkTarget<'_>>,
row: DataRow,
) -> Result<(), LeanWorkerError> {
let Some(sink) = sink else {
return Err(LeanWorkerError::Protocol {
message: "worker sent data row for a request without a row sink".to_owned(),
});
};
match sink {
LeanWorkerDataSinkTarget::Value(sink) => {
let row = LeanWorkerDataRow::try_from(row)?;
report_value_data_row(sink, row)
}
LeanWorkerDataSinkTarget::Raw(sink) => report_raw_data_row(sink, row.into()),
}
}
fn report_value_data_row(sink: &dyn LeanWorkerDataSink, row: LeanWorkerDataRow) -> Result<(), LeanWorkerError> {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
let message = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_owned()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"worker data sink panicked".to_owned()
};
LeanWorkerError::DataSinkPanic { message }
})
}
fn report_raw_data_row(sink: &dyn LeanWorkerRawDataSink, row: LeanWorkerRawDataRow) -> Result<(), LeanWorkerError> {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
let message = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_owned()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"worker data sink panicked".to_owned()
};
LeanWorkerError::DataSinkPanic { message }
})
}
pub(crate) fn report_parent_diagnostic(
sink: Option<&dyn LeanWorkerDiagnosticSink>,
diagnostic: LeanWorkerDiagnosticEvent,
) -> Result<(), LeanWorkerError> {
let Some(sink) = sink else {
return Ok(());
};
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(diagnostic))).map_err(|payload| {
let message = if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_owned()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"worker diagnostic sink panicked".to_owned()
};
LeanWorkerError::DiagnosticSinkPanic { message }
})
}
pub(crate) fn elapsed_event(
phase: String,
current: u64,
total: Option<u64>,
started: Instant,
) -> LeanWorkerProgressEvent {
LeanWorkerProgressEvent {
phase,
current,
total,
elapsed: started.elapsed(),
}
}