1use std::collections::BTreeMap;
9use std::fmt;
10use std::marker::PhantomData;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::time::{Duration, Instant};
15
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use serde_json::Value;
19use serde_json::value::RawValue;
20
21use crate::protocol::{
22 DataRow, Diagnostic, StreamSummary, WorkerCapabilityFact, WorkerCapabilityMetadata, WorkerCommandMetadata,
23 WorkerDiagnostic, WorkerDoctorDiagnostic, WorkerDoctorReport, WorkerDoctorSeverity, WorkerElabOptions,
24 WorkerElabOutcome, WorkerKernelOutcome, WorkerKernelStatus,
25};
26use crate::supervisor::{LeanWorker, LeanWorkerError};
27
28#[derive(Clone, Debug, Eq, PartialEq)]
30pub struct LeanWorkerSessionConfig {
31 project_root: PathBuf,
32 package: String,
33 lib_name: String,
34 imports: Vec<String>,
35}
36
37impl LeanWorkerSessionConfig {
38 pub fn new(
40 project_root: impl Into<PathBuf>,
41 package: impl Into<String>,
42 lib_name: impl Into<String>,
43 imports: impl IntoIterator<Item = impl Into<String>>,
44 ) -> Self {
45 Self {
46 project_root: project_root.into(),
47 package: package.into(),
48 lib_name: lib_name.into(),
49 imports: imports.into_iter().map(Into::into).collect(),
50 }
51 }
52
53 pub(crate) fn project_root_string(&self) -> String {
54 self.project_root.to_string_lossy().into_owned()
55 }
56
57 pub(crate) fn package(&self) -> &str {
58 &self.package
59 }
60
61 pub(crate) fn lib_name(&self) -> &str {
62 &self.lib_name
63 }
64
65 pub(crate) fn imports(&self) -> &[String] {
66 &self.imports
67 }
68}
69
70#[derive(Clone, Debug, Eq, PartialEq)]
75pub struct LeanWorkerElabOptions {
76 namespace_context: String,
77 file_label: String,
78 heartbeat_limit: u64,
79 diagnostic_byte_limit: usize,
80}
81
82impl LeanWorkerElabOptions {
83 #[must_use]
85 pub fn new() -> Self {
86 Self::default()
87 }
88
89 #[must_use]
91 pub fn namespace_context(mut self, namespace: &str) -> Self {
92 namespace.clone_into(&mut self.namespace_context);
93 self
94 }
95
96 #[must_use]
98 pub fn file_label(mut self, label: &str) -> Self {
99 label.clone_into(&mut self.file_label);
100 self
101 }
102
103 #[must_use]
105 pub fn heartbeat_limit(mut self, heartbeats: u64) -> Self {
106 self.heartbeat_limit = heartbeats;
107 self
108 }
109
110 #[must_use]
112 pub fn diagnostic_byte_limit(mut self, bytes: usize) -> Self {
113 self.diagnostic_byte_limit = bytes;
114 self
115 }
116
117 pub(crate) fn wire(&self) -> WorkerElabOptions {
118 WorkerElabOptions {
119 namespace_context: self.namespace_context.clone(),
120 file_label: self.file_label.clone(),
121 heartbeat_limit: self.heartbeat_limit,
122 diagnostic_byte_limit: self.diagnostic_byte_limit,
123 }
124 }
125}
126
127impl Default for LeanWorkerElabOptions {
128 fn default() -> Self {
129 Self {
130 namespace_context: String::new(),
131 file_label: "<elaborate>".to_owned(),
132 heartbeat_limit: lean_rs_host::LEAN_HEARTBEAT_LIMIT_DEFAULT,
133 diagnostic_byte_limit: lean_rs_host::LEAN_DIAGNOSTIC_BYTE_LIMIT_DEFAULT,
134 }
135 }
136}
137
138#[derive(Clone, Debug, Eq, PartialEq)]
144pub struct LeanWorkerRuntimeMetadata {
145 pub worker_version: String,
146 pub protocol_version: u16,
147 pub lean_version: Option<String>,
148}
149
150#[derive(Clone, Debug, Eq, PartialEq)]
156pub struct LeanWorkerCapabilityMetadata {
157 pub commands: Vec<LeanWorkerCommandMetadata>,
158 pub capabilities: Vec<LeanWorkerCapabilityFact>,
159 pub lean_version: Option<String>,
160 pub extra: Option<Value>,
161}
162
163impl From<WorkerCapabilityMetadata> for LeanWorkerCapabilityMetadata {
164 fn from(value: WorkerCapabilityMetadata) -> Self {
165 Self {
166 commands: value.commands.into_iter().map(Into::into).collect(),
167 capabilities: value.capabilities.into_iter().map(Into::into).collect(),
168 lean_version: value.lean_version,
169 extra: value.extra,
170 }
171 }
172}
173
174#[derive(Clone, Debug, Eq, PartialEq)]
176pub struct LeanWorkerCommandMetadata {
177 pub name: String,
178 pub version: String,
179}
180
181impl From<WorkerCommandMetadata> for LeanWorkerCommandMetadata {
182 fn from(value: WorkerCommandMetadata) -> Self {
183 Self {
184 name: value.name,
185 version: value.version,
186 }
187 }
188}
189
190#[derive(Clone, Debug, Eq, PartialEq)]
192pub struct LeanWorkerCapabilityFact {
193 pub name: String,
194 pub version: String,
195}
196
197impl From<WorkerCapabilityFact> for LeanWorkerCapabilityFact {
198 fn from(value: WorkerCapabilityFact) -> Self {
199 Self {
200 name: value.name,
201 version: value.version,
202 }
203 }
204}
205
206#[derive(Clone, Copy, Debug, Eq, PartialEq)]
208#[non_exhaustive]
209pub enum LeanWorkerDoctorSeverity {
210 Pass,
211 Warning,
212 Error,
213}
214
215impl From<WorkerDoctorSeverity> for LeanWorkerDoctorSeverity {
216 fn from(value: WorkerDoctorSeverity) -> Self {
217 match value {
218 WorkerDoctorSeverity::Pass => Self::Pass,
219 WorkerDoctorSeverity::Warning => Self::Warning,
220 WorkerDoctorSeverity::Error => Self::Error,
221 }
222 }
223}
224
225#[derive(Clone, Debug, Eq, PartialEq)]
227pub struct LeanWorkerDoctorDiagnostic {
228 pub severity: LeanWorkerDoctorSeverity,
229 pub code: String,
230 pub message: String,
231 pub details: Option<Value>,
232}
233
234impl From<WorkerDoctorDiagnostic> for LeanWorkerDoctorDiagnostic {
235 fn from(value: WorkerDoctorDiagnostic) -> Self {
236 Self {
237 severity: value.severity.into(),
238 code: value.code,
239 message: value.message,
240 details: value.details,
241 }
242 }
243}
244
245#[derive(Clone, Debug, Eq, PartialEq)]
247pub struct LeanWorkerDoctorReport {
248 pub diagnostics: Vec<LeanWorkerDoctorDiagnostic>,
249 pub metadata: Option<Value>,
250}
251
252impl From<WorkerDoctorReport> for LeanWorkerDoctorReport {
253 fn from(value: WorkerDoctorReport) -> Self {
254 Self {
255 diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
256 metadata: value.metadata,
257 }
258 }
259}
260
261#[derive(Clone, Debug, Default)]
268pub struct LeanWorkerCancellationToken {
269 cancelled: Arc<AtomicBool>,
270}
271
272impl LeanWorkerCancellationToken {
273 #[must_use]
275 pub fn new() -> Self {
276 Self::default()
277 }
278
279 pub fn cancel(&self) {
281 self.cancelled.store(true, Ordering::Release);
282 }
283
284 #[must_use]
286 pub fn is_cancelled(&self) -> bool {
287 self.cancelled.load(Ordering::Acquire)
288 }
289}
290
291#[derive(Clone, Debug, Eq, PartialEq)]
293pub struct LeanWorkerProgressEvent {
294 pub phase: String,
295 pub current: u64,
296 pub total: Option<u64>,
297 pub elapsed: Duration,
298}
299
300pub trait LeanWorkerProgressSink: Send + Sync {
302 fn report(&self, event: LeanWorkerProgressEvent);
303}
304
305#[derive(Clone, Debug, Eq, PartialEq)]
311pub struct LeanWorkerDataRow {
312 pub stream: String,
313 pub sequence: u64,
314 pub payload: Value,
315}
316
317impl TryFrom<DataRow> for LeanWorkerDataRow {
318 type Error = LeanWorkerError;
319
320 fn try_from(value: DataRow) -> Result<Self, Self::Error> {
321 let payload = serde_json::from_str(value.payload.get()).map_err(|err| LeanWorkerError::Protocol {
322 message: format!("worker data-row payload decode failed: {err}"),
323 })?;
324 Ok(Self {
325 stream: value.stream,
326 sequence: value.sequence,
327 payload,
328 })
329 }
330}
331
332pub trait LeanWorkerDataSink: Send + Sync {
338 fn report(&self, row: LeanWorkerDataRow);
339}
340
341pub(crate) struct LeanWorkerRawDataRow {
342 pub(crate) stream: String,
343 pub(crate) sequence: u64,
344 pub(crate) payload: Box<RawValue>,
345}
346
347impl From<DataRow> for LeanWorkerRawDataRow {
348 fn from(value: DataRow) -> Self {
349 Self {
350 stream: value.stream,
351 sequence: value.sequence,
352 payload: value.payload,
353 }
354 }
355}
356
357pub(crate) trait LeanWorkerRawDataSink: Send + Sync {
358 fn report(&self, row: LeanWorkerRawDataRow);
359}
360
361#[derive(Clone, Copy)]
362pub(crate) enum LeanWorkerDataSinkTarget<'a> {
363 Value(&'a dyn LeanWorkerDataSink),
364 Raw(&'a dyn LeanWorkerRawDataSink),
365}
366
367#[derive(Clone, Debug, Eq, PartialEq)]
373pub struct LeanWorkerDiagnosticEvent {
374 pub code: String,
375 pub message: String,
376}
377
378impl From<Diagnostic> for LeanWorkerDiagnosticEvent {
379 fn from(value: Diagnostic) -> Self {
380 Self {
381 code: value.code,
382 message: value.message,
383 }
384 }
385}
386
387pub trait LeanWorkerDiagnosticSink: Send + Sync {
389 fn report(&self, diagnostic: LeanWorkerDiagnosticEvent);
390}
391
392#[derive(Clone, Debug, Eq, PartialEq)]
398pub struct LeanWorkerStreamSummary {
399 pub total_rows: u64,
401 pub per_stream_counts: BTreeMap<String, u64>,
403 pub elapsed: Duration,
405 pub metadata: Option<Value>,
407}
408
409impl From<StreamSummary> for LeanWorkerStreamSummary {
410 fn from(value: StreamSummary) -> Self {
411 Self {
412 total_rows: value.total_rows,
413 per_stream_counts: value.per_stream_counts,
414 elapsed: Duration::from_micros(value.elapsed_micros),
415 metadata: value.metadata,
416 }
417 }
418}
419
420pub struct LeanWorkerJsonCommand<Req, Resp> {
426 export: String,
427 _types: PhantomData<fn(&Req) -> Resp>,
428}
429
430impl<Req, Resp> LeanWorkerJsonCommand<Req, Resp> {
431 #[must_use]
433 pub fn new(export: impl Into<String>) -> Self {
434 Self {
435 export: export.into(),
436 _types: PhantomData,
437 }
438 }
439
440 #[must_use]
442 pub fn export(&self) -> &str {
443 &self.export
444 }
445}
446
447impl<Req, Resp> Clone for LeanWorkerJsonCommand<Req, Resp> {
448 fn clone(&self) -> Self {
449 Self {
450 export: self.export.clone(),
451 _types: PhantomData,
452 }
453 }
454}
455
456impl<Req, Resp> fmt::Debug for LeanWorkerJsonCommand<Req, Resp> {
457 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458 f.debug_struct("LeanWorkerJsonCommand")
459 .field("export", &self.export)
460 .finish()
461 }
462}
463
464impl<Req, Resp> PartialEq for LeanWorkerJsonCommand<Req, Resp> {
465 fn eq(&self, other: &Self) -> bool {
466 self.export == other.export
467 }
468}
469
470impl<Req, Resp> Eq for LeanWorkerJsonCommand<Req, Resp> {}
471
472pub struct LeanWorkerStreamingCommand<Req, Row, Summary> {
480 export: String,
481 _types: PhantomData<fn(&Req) -> (Row, Summary)>,
482}
483
484impl<Req, Row, Summary> LeanWorkerStreamingCommand<Req, Row, Summary> {
485 #[must_use]
487 pub fn new(export: impl Into<String>) -> Self {
488 Self {
489 export: export.into(),
490 _types: PhantomData,
491 }
492 }
493
494 #[must_use]
496 pub fn export(&self) -> &str {
497 &self.export
498 }
499}
500
501impl<Req, Row, Summary> Clone for LeanWorkerStreamingCommand<Req, Row, Summary> {
502 fn clone(&self) -> Self {
503 Self {
504 export: self.export.clone(),
505 _types: PhantomData,
506 }
507 }
508}
509
510impl<Req, Row, Summary> fmt::Debug for LeanWorkerStreamingCommand<Req, Row, Summary> {
511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512 f.debug_struct("LeanWorkerStreamingCommand")
513 .field("export", &self.export)
514 .finish()
515 }
516}
517
518impl<Req, Row, Summary> PartialEq for LeanWorkerStreamingCommand<Req, Row, Summary> {
519 fn eq(&self, other: &Self) -> bool {
520 self.export == other.export
521 }
522}
523
524impl<Req, Row, Summary> Eq for LeanWorkerStreamingCommand<Req, Row, Summary> {}
525
526#[derive(Clone, Debug, Eq, PartialEq)]
528pub struct LeanWorkerTypedDataRow<Row> {
529 pub stream: String,
530 pub sequence: u64,
531 pub payload: Row,
532}
533
534pub trait LeanWorkerTypedDataSink<Row>: Send + Sync {
539 fn report(&self, row: LeanWorkerTypedDataRow<Row>);
540}
541
542#[derive(Clone, Debug, Eq, PartialEq)]
548pub struct LeanWorkerTypedStreamSummary<Summary> {
549 pub total_rows: u64,
550 pub per_stream_counts: BTreeMap<String, u64>,
551 pub elapsed: Duration,
552 pub metadata: Option<Summary>,
553}
554
555#[derive(Clone, Debug, Eq, PartialEq)]
557pub struct LeanWorkerElabResult {
558 pub success: bool,
559 pub diagnostics: Vec<LeanWorkerDiagnostic>,
560 pub truncated: bool,
561}
562
563impl From<WorkerElabOutcome> for LeanWorkerElabResult {
564 fn from(value: WorkerElabOutcome) -> Self {
565 Self {
566 success: value.success,
567 diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
568 truncated: value.truncated,
569 }
570 }
571}
572
573#[derive(Clone, Copy, Debug, Eq, PartialEq)]
575#[non_exhaustive]
576pub enum LeanWorkerKernelStatus {
577 Checked,
578 Rejected,
579 Unavailable,
580 Unsupported,
581}
582
583#[derive(Clone, Debug, Eq, PartialEq)]
585pub struct LeanWorkerKernelResult {
586 pub status: LeanWorkerKernelStatus,
587 pub diagnostics: Vec<LeanWorkerDiagnostic>,
588 pub truncated: bool,
589}
590
591impl From<WorkerKernelOutcome> for LeanWorkerKernelResult {
592 fn from(value: WorkerKernelOutcome) -> Self {
593 Self {
594 status: match value.status {
595 WorkerKernelStatus::Checked => LeanWorkerKernelStatus::Checked,
596 WorkerKernelStatus::Rejected => LeanWorkerKernelStatus::Rejected,
597 WorkerKernelStatus::Unavailable => LeanWorkerKernelStatus::Unavailable,
598 WorkerKernelStatus::Unsupported => LeanWorkerKernelStatus::Unsupported,
599 },
600 diagnostics: value.diagnostics.into_iter().map(Into::into).collect(),
601 truncated: value.truncated,
602 }
603 }
604}
605
606#[derive(Clone, Debug, Eq, PartialEq)]
608pub struct LeanWorkerDiagnostic {
609 pub severity: String,
610 pub message: String,
611 pub file_label: String,
612 pub line: Option<u32>,
613 pub column: Option<u32>,
614 pub end_line: Option<u32>,
615 pub end_column: Option<u32>,
616}
617
618impl From<WorkerDiagnostic> for LeanWorkerDiagnostic {
619 fn from(value: WorkerDiagnostic) -> Self {
620 Self {
621 severity: value.severity,
622 message: value.message,
623 file_label: value.file_label,
624 line: value.line,
625 column: value.column,
626 end_line: value.end_line,
627 end_column: value.end_column,
628 }
629 }
630}
631
632pub struct LeanWorkerSession<'worker> {
638 worker: &'worker mut LeanWorker,
639 open: bool,
640}
641
642impl LeanWorker {
643 pub fn open_session<'worker>(
651 &'worker mut self,
652 config: &LeanWorkerSessionConfig,
653 cancellation: Option<&LeanWorkerCancellationToken>,
654 progress: Option<&dyn LeanWorkerProgressSink>,
655 ) -> Result<LeanWorkerSession<'worker>, LeanWorkerError> {
656 self.open_worker_session(config, cancellation, progress)?;
657 Ok(LeanWorkerSession {
658 worker: self,
659 open: true,
660 })
661 }
662}
663
664impl LeanWorkerSession<'_> {
665 #[must_use]
667 pub fn request_timeout(&self) -> Duration {
668 self.worker.request_timeout()
669 }
670
671 pub fn set_request_timeout(&mut self, timeout: Duration) {
676 self.worker.set_request_timeout(timeout);
677 }
678
679 pub fn elaborate(
687 &mut self,
688 source: &str,
689 options: &LeanWorkerElabOptions,
690 cancellation: Option<&LeanWorkerCancellationToken>,
691 progress: Option<&dyn LeanWorkerProgressSink>,
692 ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
693 self.ensure_open()?;
694 match self.worker.worker_elaborate(source, options, cancellation, progress) {
695 Ok(value) => Ok(value),
696 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
697 self.open = false;
698 Err(err)
699 }
700 Err(err) => Err(err),
701 }
702 }
703
704 pub fn kernel_check(
712 &mut self,
713 source: &str,
714 options: &LeanWorkerElabOptions,
715 cancellation: Option<&LeanWorkerCancellationToken>,
716 progress: Option<&dyn LeanWorkerProgressSink>,
717 ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
718 self.ensure_open()?;
719 match self.worker.worker_kernel_check(source, options, cancellation, progress) {
720 Ok(value) => Ok(value),
721 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
722 self.open = false;
723 Err(err)
724 }
725 Err(err) => Err(err),
726 }
727 }
728
729 pub fn declaration_kinds(
737 &mut self,
738 names: &[&str],
739 cancellation: Option<&LeanWorkerCancellationToken>,
740 progress: Option<&dyn LeanWorkerProgressSink>,
741 ) -> Result<Vec<String>, LeanWorkerError> {
742 self.ensure_open()?;
743 match self.worker.worker_declaration_kinds(names, cancellation, progress) {
744 Ok(value) => Ok(value),
745 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
746 self.open = false;
747 Err(err)
748 }
749 Err(err) => Err(err),
750 }
751 }
752
753 pub fn declaration_names(
761 &mut self,
762 names: &[&str],
763 cancellation: Option<&LeanWorkerCancellationToken>,
764 progress: Option<&dyn LeanWorkerProgressSink>,
765 ) -> Result<Vec<String>, LeanWorkerError> {
766 self.ensure_open()?;
767 match self.worker.worker_declaration_names(names, cancellation, progress) {
768 Ok(value) => Ok(value),
769 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
770 self.open = false;
771 Err(err)
772 }
773 Err(err) => Err(err),
774 }
775 }
776
777 pub fn run_data_stream(
791 &mut self,
792 export: &str,
793 request: &Value,
794 rows: &dyn LeanWorkerDataSink,
795 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
796 cancellation: Option<&LeanWorkerCancellationToken>,
797 progress: Option<&dyn LeanWorkerProgressSink>,
798 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
799 self.ensure_open()?;
800 match self
801 .worker
802 .worker_run_data_stream(export, request, rows, diagnostics, cancellation, progress)
803 {
804 Ok(value) => Ok(value),
805 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
806 self.open = false;
807 Err(err)
808 }
809 Err(err) => Err(err),
810 }
811 }
812
813 fn run_data_stream_raw(
814 &mut self,
815 export: &str,
816 request: &Value,
817 rows: &dyn LeanWorkerRawDataSink,
818 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
819 cancellation: Option<&LeanWorkerCancellationToken>,
820 progress: Option<&dyn LeanWorkerProgressSink>,
821 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
822 self.ensure_open()?;
823 match self
824 .worker
825 .worker_run_data_stream_raw(export, request, rows, diagnostics, cancellation, progress)
826 {
827 Ok(value) => Ok(value),
828 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
829 self.open = false;
830 Err(err)
831 }
832 Err(err) => Err(err),
833 }
834 }
835
836 pub fn run_json_command<Req, Resp>(
849 &mut self,
850 command: &LeanWorkerJsonCommand<Req, Resp>,
851 request: &Req,
852 cancellation: Option<&LeanWorkerCancellationToken>,
853 progress: Option<&dyn LeanWorkerProgressSink>,
854 ) -> Result<Resp, LeanWorkerError>
855 where
856 Req: Serialize,
857 Resp: DeserializeOwned,
858 {
859 self.ensure_open()?;
860 let request_json =
861 serde_json::to_string(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
862 export: command.export().to_owned(),
863 message: err.to_string(),
864 })?;
865 match self
866 .worker
867 .worker_json_command(command.export(), request_json, cancellation, progress)
868 {
869 Ok(response_json) => {
870 serde_json::from_str(&response_json).map_err(|err| LeanWorkerError::TypedCommandResponseDecode {
871 export: command.export().to_owned(),
872 message: err.to_string(),
873 })
874 }
875 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
876 self.open = false;
877 Err(err)
878 }
879 Err(err) => Err(err),
880 }
881 }
882
883 pub fn run_streaming_command<Req, Row, Summary>(
899 &mut self,
900 command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
901 request: &Req,
902 rows: &dyn LeanWorkerTypedDataSink<Row>,
903 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
904 cancellation: Option<&LeanWorkerCancellationToken>,
905 progress: Option<&dyn LeanWorkerProgressSink>,
906 ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
907 where
908 Req: Serialize,
909 Row: DeserializeOwned,
910 Summary: DeserializeOwned,
911 {
912 self.ensure_open()?;
913 let request_value =
914 serde_json::to_value(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
915 export: command.export().to_owned(),
916 message: err.to_string(),
917 })?;
918 let internal_cancellation = LeanWorkerCancellationToken::new();
919 let cancellation_for_stream = cancellation.unwrap_or(&internal_cancellation);
920 let typed_sink = TypedRawDataSink {
921 export: command.export(),
922 rows,
923 cancellation: cancellation_for_stream,
924 decode_error: std::sync::Mutex::new(None),
925 };
926
927 match self.run_data_stream_raw(
928 command.export(),
929 &request_value,
930 &typed_sink,
931 diagnostics,
932 Some(cancellation_for_stream),
933 progress,
934 ) {
935 Ok(summary) => {
936 if let Some(err) = typed_sink.take_decode_error() {
937 return Err(err);
938 }
939 let metadata = summary
940 .metadata
941 .map(|metadata| {
942 serde_json::from_value(metadata).map_err(|err| LeanWorkerError::TypedCommandSummaryDecode {
943 export: command.export().to_owned(),
944 message: err.to_string(),
945 })
946 })
947 .transpose()?;
948 Ok(LeanWorkerTypedStreamSummary {
949 total_rows: summary.total_rows,
950 per_stream_counts: summary.per_stream_counts,
951 elapsed: summary.elapsed,
952 metadata,
953 })
954 }
955 Err(LeanWorkerError::Cancelled { .. }) => {
956 if let Some(err) = typed_sink.take_decode_error() {
957 Err(err)
958 } else {
959 self.open = false;
960 Err(LeanWorkerError::Cancelled {
961 operation: "worker_run_data_stream",
962 })
963 }
964 }
965 Err(err) => Err(err),
966 }
967 }
968
969 pub fn capability_metadata(
982 &mut self,
983 export: &str,
984 request: &Value,
985 cancellation: Option<&LeanWorkerCancellationToken>,
986 progress: Option<&dyn LeanWorkerProgressSink>,
987 ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
988 self.ensure_open()?;
989 match self
990 .worker
991 .worker_capability_metadata(export, request, cancellation, progress)
992 {
993 Ok(value) => Ok(value),
994 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
995 self.open = false;
996 Err(err)
997 }
998 Err(err) => Err(err),
999 }
1000 }
1001
1002 pub fn capability_doctor(
1015 &mut self,
1016 export: &str,
1017 request: &Value,
1018 cancellation: Option<&LeanWorkerCancellationToken>,
1019 progress: Option<&dyn LeanWorkerProgressSink>,
1020 ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
1021 self.ensure_open()?;
1022 match self
1023 .worker
1024 .worker_capability_doctor(export, request, cancellation, progress)
1025 {
1026 Ok(value) => Ok(value),
1027 Err(err @ (LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })) => {
1028 self.open = false;
1029 Err(err)
1030 }
1031 Err(err) => Err(err),
1032 }
1033 }
1034
1035 fn ensure_open(&self) -> Result<(), LeanWorkerError> {
1036 if self.open {
1037 Ok(())
1038 } else {
1039 Err(LeanWorkerError::UnsupportedRequest {
1040 operation: "worker_session_invalidated",
1041 })
1042 }
1043 }
1044}
1045
1046struct TypedRawDataSink<'a, Row> {
1047 export: &'a str,
1048 rows: &'a dyn LeanWorkerTypedDataSink<Row>,
1049 cancellation: &'a LeanWorkerCancellationToken,
1050 decode_error: std::sync::Mutex<Option<LeanWorkerError>>,
1051}
1052
1053impl<Row> TypedRawDataSink<'_, Row> {
1054 fn take_decode_error(&self) -> Option<LeanWorkerError> {
1055 self.decode_error.lock().ok().and_then(|mut guard| guard.take())
1056 }
1057}
1058
1059impl<Row> LeanWorkerRawDataSink for TypedRawDataSink<'_, Row>
1060where
1061 Row: DeserializeOwned,
1062{
1063 fn report(&self, row: LeanWorkerRawDataRow) {
1064 match serde_json::from_str(row.payload.get()) {
1065 Ok(payload) => self.rows.report(LeanWorkerTypedDataRow {
1066 stream: row.stream,
1067 sequence: row.sequence,
1068 payload,
1069 }),
1070 Err(err) => {
1071 if let Ok(mut guard) = self.decode_error.lock() {
1072 *guard = Some(LeanWorkerError::TypedCommandRowDecode {
1073 export: self.export.to_owned(),
1074 stream: row.stream,
1075 sequence: row.sequence,
1076 message: err.to_string(),
1077 });
1078 }
1079 self.cancellation.cancel();
1080 }
1081 }
1082 }
1083}
1084
1085pub(crate) fn check_cancelled(
1086 operation: &'static str,
1087 token: Option<&LeanWorkerCancellationToken>,
1088) -> Result<(), LeanWorkerError> {
1089 if token.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
1090 Err(LeanWorkerError::Cancelled { operation })
1091 } else {
1092 Ok(())
1093 }
1094}
1095
1096pub(crate) fn report_parent_progress(
1097 sink: Option<&dyn LeanWorkerProgressSink>,
1098 event: LeanWorkerProgressEvent,
1099) -> Result<(), LeanWorkerError> {
1100 let Some(sink) = sink else {
1101 return Ok(());
1102 };
1103 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(event))).map_err(|payload| {
1104 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1105 (*s).to_owned()
1106 } else if let Some(s) = payload.downcast_ref::<String>() {
1107 s.clone()
1108 } else {
1109 "worker progress sink panicked".to_owned()
1110 };
1111 LeanWorkerError::ProgressPanic { message }
1112 })
1113}
1114
1115pub(crate) fn report_parent_data_row(
1116 sink: Option<LeanWorkerDataSinkTarget<'_>>,
1117 row: DataRow,
1118) -> Result<(), LeanWorkerError> {
1119 let Some(sink) = sink else {
1120 return Err(LeanWorkerError::Protocol {
1121 message: "worker sent data row for a request without a row sink".to_owned(),
1122 });
1123 };
1124 match sink {
1125 LeanWorkerDataSinkTarget::Value(sink) => {
1126 let row = LeanWorkerDataRow::try_from(row)?;
1127 report_value_data_row(sink, row)
1128 }
1129 LeanWorkerDataSinkTarget::Raw(sink) => report_raw_data_row(sink, row.into()),
1130 }
1131}
1132
1133fn report_value_data_row(sink: &dyn LeanWorkerDataSink, row: LeanWorkerDataRow) -> Result<(), LeanWorkerError> {
1134 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
1135 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1136 (*s).to_owned()
1137 } else if let Some(s) = payload.downcast_ref::<String>() {
1138 s.clone()
1139 } else {
1140 "worker data sink panicked".to_owned()
1141 };
1142 LeanWorkerError::DataSinkPanic { message }
1143 })
1144}
1145
1146fn report_raw_data_row(sink: &dyn LeanWorkerRawDataSink, row: LeanWorkerRawDataRow) -> Result<(), LeanWorkerError> {
1147 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
1148 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1149 (*s).to_owned()
1150 } else if let Some(s) = payload.downcast_ref::<String>() {
1151 s.clone()
1152 } else {
1153 "worker data sink panicked".to_owned()
1154 };
1155 LeanWorkerError::DataSinkPanic { message }
1156 })
1157}
1158
1159pub(crate) fn report_parent_diagnostic(
1160 sink: Option<&dyn LeanWorkerDiagnosticSink>,
1161 diagnostic: LeanWorkerDiagnosticEvent,
1162) -> Result<(), LeanWorkerError> {
1163 let Some(sink) = sink else {
1164 return Ok(());
1165 };
1166 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(diagnostic))).map_err(|payload| {
1167 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1168 (*s).to_owned()
1169 } else if let Some(s) = payload.downcast_ref::<String>() {
1170 s.clone()
1171 } else {
1172 "worker diagnostic sink panicked".to_owned()
1173 };
1174 LeanWorkerError::DiagnosticSinkPanic { message }
1175 })
1176}
1177
1178pub(crate) fn elapsed_event(
1179 phase: String,
1180 current: u64,
1181 total: Option<u64>,
1182 started: Instant,
1183) -> LeanWorkerProgressEvent {
1184 LeanWorkerProgressEvent {
1185 phase,
1186 current,
1187 total,
1188 elapsed: started.elapsed(),
1189 }
1190}