1use std::collections::BTreeMap;
9use std::fmt;
10use std::marker::PhantomData;
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::time::{Duration, Instant};
15
16use serde::Serialize;
17use serde::de::DeserializeOwned;
18use serde_json::Value;
19use serde_json::value::RawValue;
20
21use crate::protocol::{DataRow, Diagnostic, StreamSummary};
22use crate::supervisor::{LeanWorker, LeanWorkerError};
23use crate::types::{
24 LeanWorkerCapabilityMetadata, LeanWorkerDeclarationFilter, LeanWorkerDeclarationRow, LeanWorkerDoctorReport,
25 LeanWorkerElabOptions, LeanWorkerElabResult, LeanWorkerKernelResult, LeanWorkerMetaResult,
26 LeanWorkerMetaTransparency, LeanWorkerProcessFileOutcome, LeanWorkerProcessModuleOutcome, LeanWorkerRendered,
27};
28
29#[derive(Clone, Debug, Eq, PartialEq)]
31pub struct LeanWorkerSessionConfig {
32 project_root: PathBuf,
33 package: String,
34 lib_name: String,
35 imports: Vec<String>,
36}
37
38impl LeanWorkerSessionConfig {
39 pub fn new(
41 project_root: impl Into<PathBuf>,
42 package: impl Into<String>,
43 lib_name: impl Into<String>,
44 imports: impl IntoIterator<Item = impl Into<String>>,
45 ) -> Self {
46 Self {
47 project_root: project_root.into(),
48 package: package.into(),
49 lib_name: lib_name.into(),
50 imports: imports.into_iter().map(Into::into).collect(),
51 }
52 }
53
54 pub(crate) fn project_root_string(&self) -> String {
55 self.project_root.to_string_lossy().into_owned()
56 }
57
58 pub(crate) fn package(&self) -> &str {
59 &self.package
60 }
61
62 pub(crate) fn lib_name(&self) -> &str {
63 &self.lib_name
64 }
65
66 pub(crate) fn imports(&self) -> &[String] {
67 &self.imports
68 }
69}
70
71#[derive(Clone, Debug, Eq, PartialEq)]
77pub struct LeanWorkerRuntimeMetadata {
78 pub worker_version: String,
79 pub protocol_version: u16,
80 pub lean_version: Option<String>,
81}
82
83#[derive(Clone, Debug, Default)]
90pub struct LeanWorkerCancellationToken {
91 cancelled: Arc<AtomicBool>,
92}
93
94impl LeanWorkerCancellationToken {
95 #[must_use]
97 pub fn new() -> Self {
98 Self::default()
99 }
100
101 pub fn cancel(&self) {
103 self.cancelled.store(true, Ordering::Release);
104 }
105
106 #[must_use]
108 pub fn is_cancelled(&self) -> bool {
109 self.cancelled.load(Ordering::Acquire)
110 }
111}
112
113#[derive(Clone, Debug, Eq, PartialEq)]
115pub struct LeanWorkerProgressEvent {
116 pub phase: String,
117 pub current: u64,
118 pub total: Option<u64>,
119 pub elapsed: Duration,
120}
121
122pub trait LeanWorkerProgressSink: Send + Sync {
124 fn report(&self, event: LeanWorkerProgressEvent);
125}
126
127#[derive(Clone, Debug, Eq, PartialEq)]
133pub struct LeanWorkerDataRow {
134 pub stream: String,
135 pub sequence: u64,
136 pub payload: Value,
137}
138
139impl TryFrom<DataRow> for LeanWorkerDataRow {
140 type Error = LeanWorkerError;
141
142 fn try_from(value: DataRow) -> Result<Self, Self::Error> {
143 let payload = serde_json::from_str(value.payload.get()).map_err(|err| LeanWorkerError::Protocol {
144 message: format!("worker data-row payload decode failed: {err}"),
145 })?;
146 Ok(Self {
147 stream: value.stream,
148 sequence: value.sequence,
149 payload,
150 })
151 }
152}
153
154pub trait LeanWorkerDataSink: Send + Sync {
160 fn report(&self, row: LeanWorkerDataRow);
161}
162
163pub(crate) struct LeanWorkerRawDataRow {
164 pub(crate) stream: String,
165 pub(crate) sequence: u64,
166 pub(crate) payload: Box<RawValue>,
167}
168
169impl From<DataRow> for LeanWorkerRawDataRow {
170 fn from(value: DataRow) -> Self {
171 Self {
172 stream: value.stream,
173 sequence: value.sequence,
174 payload: value.payload,
175 }
176 }
177}
178
179pub(crate) trait LeanWorkerRawDataSink: Send + Sync {
180 fn report(&self, row: LeanWorkerRawDataRow);
181}
182
183#[derive(Clone, Copy)]
184pub(crate) enum LeanWorkerDataSinkTarget<'a> {
185 Value(&'a dyn LeanWorkerDataSink),
186 Raw(&'a dyn LeanWorkerRawDataSink),
187}
188
189#[derive(Clone, Debug, Eq, PartialEq)]
195pub struct LeanWorkerDiagnosticEvent {
196 pub code: String,
197 pub message: String,
198}
199
200impl From<Diagnostic> for LeanWorkerDiagnosticEvent {
201 fn from(value: Diagnostic) -> Self {
202 Self {
203 code: value.code,
204 message: value.message,
205 }
206 }
207}
208
209pub trait LeanWorkerDiagnosticSink: Send + Sync {
211 fn report(&self, diagnostic: LeanWorkerDiagnosticEvent);
212}
213
214#[derive(Clone, Debug, Eq, PartialEq)]
220pub struct LeanWorkerStreamSummary {
221 pub total_rows: u64,
223 pub per_stream_counts: BTreeMap<String, u64>,
225 pub elapsed: Duration,
227 pub metadata: Option<Value>,
229}
230
231impl From<StreamSummary> for LeanWorkerStreamSummary {
232 fn from(value: StreamSummary) -> Self {
233 Self {
234 total_rows: value.total_rows,
235 per_stream_counts: value.per_stream_counts,
236 elapsed: Duration::from_micros(value.elapsed_micros),
237 metadata: value.metadata,
238 }
239 }
240}
241
242pub struct LeanWorkerJsonCommand<Req, Resp> {
248 export: String,
249 _types: PhantomData<fn(&Req) -> Resp>,
250}
251
252impl<Req, Resp> LeanWorkerJsonCommand<Req, Resp> {
253 #[must_use]
255 pub fn new(export: impl Into<String>) -> Self {
256 Self {
257 export: export.into(),
258 _types: PhantomData,
259 }
260 }
261
262 #[must_use]
264 pub fn export(&self) -> &str {
265 &self.export
266 }
267}
268
269impl<Req, Resp> Clone for LeanWorkerJsonCommand<Req, Resp> {
270 fn clone(&self) -> Self {
271 Self {
272 export: self.export.clone(),
273 _types: PhantomData,
274 }
275 }
276}
277
278impl<Req, Resp> fmt::Debug for LeanWorkerJsonCommand<Req, Resp> {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 f.debug_struct("LeanWorkerJsonCommand")
281 .field("export", &self.export)
282 .finish()
283 }
284}
285
286impl<Req, Resp> PartialEq for LeanWorkerJsonCommand<Req, Resp> {
287 fn eq(&self, other: &Self) -> bool {
288 self.export == other.export
289 }
290}
291
292impl<Req, Resp> Eq for LeanWorkerJsonCommand<Req, Resp> {}
293
294pub struct LeanWorkerStreamingCommand<Req, Row, Summary> {
302 export: String,
303 _types: PhantomData<fn(&Req) -> (Row, Summary)>,
304}
305
306impl<Req, Row, Summary> LeanWorkerStreamingCommand<Req, Row, Summary> {
307 #[must_use]
309 pub fn new(export: impl Into<String>) -> Self {
310 Self {
311 export: export.into(),
312 _types: PhantomData,
313 }
314 }
315
316 #[must_use]
318 pub fn export(&self) -> &str {
319 &self.export
320 }
321}
322
323impl<Req, Row, Summary> Clone for LeanWorkerStreamingCommand<Req, Row, Summary> {
324 fn clone(&self) -> Self {
325 Self {
326 export: self.export.clone(),
327 _types: PhantomData,
328 }
329 }
330}
331
332impl<Req, Row, Summary> fmt::Debug for LeanWorkerStreamingCommand<Req, Row, Summary> {
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 f.debug_struct("LeanWorkerStreamingCommand")
335 .field("export", &self.export)
336 .finish()
337 }
338}
339
340impl<Req, Row, Summary> PartialEq for LeanWorkerStreamingCommand<Req, Row, Summary> {
341 fn eq(&self, other: &Self) -> bool {
342 self.export == other.export
343 }
344}
345
346impl<Req, Row, Summary> Eq for LeanWorkerStreamingCommand<Req, Row, Summary> {}
347
348#[derive(Clone, Debug, Eq, PartialEq)]
350pub struct LeanWorkerTypedDataRow<Row> {
351 pub stream: String,
352 pub sequence: u64,
353 pub payload: Row,
354}
355
356pub trait LeanWorkerTypedDataSink<Row>: Send + Sync {
361 fn report(&self, row: LeanWorkerTypedDataRow<Row>);
362}
363
364#[derive(Clone, Debug, Eq, PartialEq)]
370pub struct LeanWorkerTypedStreamSummary<Summary> {
371 pub total_rows: u64,
372 pub per_stream_counts: BTreeMap<String, u64>,
373 pub elapsed: Duration,
374 pub metadata: Option<Summary>,
375}
376
377pub struct LeanWorkerSession<'worker> {
383 worker: &'worker mut LeanWorker,
384 open: bool,
385}
386
387impl LeanWorker {
388 pub fn open_session<'worker>(
396 &'worker mut self,
397 config: &LeanWorkerSessionConfig,
398 cancellation: Option<&LeanWorkerCancellationToken>,
399 progress: Option<&dyn LeanWorkerProgressSink>,
400 ) -> Result<LeanWorkerSession<'worker>, LeanWorkerError> {
401 self.open_worker_session(config, cancellation, progress)?;
402 Ok(LeanWorkerSession {
403 worker: self,
404 open: true,
405 })
406 }
407}
408
409impl LeanWorkerSession<'_> {
410 #[must_use]
412 pub fn request_timeout(&self) -> Duration {
413 self.worker.request_timeout()
414 }
415
416 pub fn set_request_timeout(&mut self, timeout: Duration) {
421 self.worker.set_request_timeout(timeout);
422 }
423
424 pub fn elaborate(
432 &mut self,
433 source: &str,
434 options: &LeanWorkerElabOptions,
435 cancellation: Option<&LeanWorkerCancellationToken>,
436 progress: Option<&dyn LeanWorkerProgressSink>,
437 ) -> Result<LeanWorkerElabResult, LeanWorkerError> {
438 self.with_session(|worker| worker.worker_elaborate(source, options, cancellation, progress))
439 }
440
441 pub fn kernel_check(
449 &mut self,
450 source: &str,
451 options: &LeanWorkerElabOptions,
452 cancellation: Option<&LeanWorkerCancellationToken>,
453 progress: Option<&dyn LeanWorkerProgressSink>,
454 ) -> Result<LeanWorkerKernelResult, LeanWorkerError> {
455 self.with_session(|worker| worker.worker_kernel_check(source, options, cancellation, progress))
456 }
457
458 pub fn declaration_kinds(
466 &mut self,
467 names: &[&str],
468 cancellation: Option<&LeanWorkerCancellationToken>,
469 progress: Option<&dyn LeanWorkerProgressSink>,
470 ) -> Result<Vec<String>, LeanWorkerError> {
471 self.with_session(|worker| worker.worker_declaration_kinds(names, cancellation, progress))
472 }
473
474 pub fn declaration_names(
482 &mut self,
483 names: &[&str],
484 cancellation: Option<&LeanWorkerCancellationToken>,
485 progress: Option<&dyn LeanWorkerProgressSink>,
486 ) -> Result<Vec<String>, LeanWorkerError> {
487 self.with_session(|worker| worker.worker_declaration_names(names, cancellation, progress))
488 }
489
490 pub fn infer_type(
516 &mut self,
517 source: &str,
518 options: &LeanWorkerElabOptions,
519 cancellation: Option<&LeanWorkerCancellationToken>,
520 progress: Option<&dyn LeanWorkerProgressSink>,
521 ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
522 self.with_session(|worker| worker.worker_infer_type(source, options, cancellation, progress))
523 }
524
525 pub fn whnf(
540 &mut self,
541 source: &str,
542 options: &LeanWorkerElabOptions,
543 cancellation: Option<&LeanWorkerCancellationToken>,
544 progress: Option<&dyn LeanWorkerProgressSink>,
545 ) -> Result<LeanWorkerMetaResult<LeanWorkerRendered>, LeanWorkerError> {
546 self.with_session(|worker| worker.worker_whnf(source, options, cancellation, progress))
547 }
548
549 pub fn is_def_eq(
558 &mut self,
559 lhs: &str,
560 rhs: &str,
561 transparency: LeanWorkerMetaTransparency,
562 options: &LeanWorkerElabOptions,
563 cancellation: Option<&LeanWorkerCancellationToken>,
564 progress: Option<&dyn LeanWorkerProgressSink>,
565 ) -> Result<LeanWorkerMetaResult<bool>, LeanWorkerError> {
566 self.with_session(|worker| worker.worker_is_def_eq(lhs, rhs, transparency, options, cancellation, progress))
567 }
568
569 pub fn describe(
580 &mut self,
581 name: &str,
582 cancellation: Option<&LeanWorkerCancellationToken>,
583 progress: Option<&dyn LeanWorkerProgressSink>,
584 ) -> Result<Option<LeanWorkerDeclarationRow>, LeanWorkerError> {
585 self.with_session(|worker| worker.worker_describe(name, cancellation, progress))
586 }
587
588 pub fn list_declarations_strings(
599 &mut self,
600 filter: &LeanWorkerDeclarationFilter,
601 cancellation: Option<&LeanWorkerCancellationToken>,
602 progress: Option<&dyn LeanWorkerProgressSink>,
603 ) -> Result<Vec<String>, LeanWorkerError> {
604 self.with_session(|worker| worker.worker_list_declarations_strings(*filter, cancellation, progress))
605 }
606
607 pub fn describe_bulk(
619 &mut self,
620 names: &[&str],
621 cancellation: Option<&LeanWorkerCancellationToken>,
622 progress: Option<&dyn LeanWorkerProgressSink>,
623 ) -> Result<Vec<LeanWorkerDeclarationRow>, LeanWorkerError> {
624 self.with_session(|worker| worker.worker_describe_bulk(names, cancellation, progress))
625 }
626
627 pub fn process_file(
640 &mut self,
641 source: &str,
642 options: &LeanWorkerElabOptions,
643 cancellation: Option<&LeanWorkerCancellationToken>,
644 progress: Option<&dyn LeanWorkerProgressSink>,
645 ) -> Result<LeanWorkerProcessFileOutcome, LeanWorkerError> {
646 self.with_session(|worker| worker.worker_process_file(source, options, cancellation, progress))
647 }
648
649 pub fn process_module(
660 &mut self,
661 source: &str,
662 options: &LeanWorkerElabOptions,
663 cancellation: Option<&LeanWorkerCancellationToken>,
664 progress: Option<&dyn LeanWorkerProgressSink>,
665 ) -> Result<LeanWorkerProcessModuleOutcome, LeanWorkerError> {
666 self.with_session(|worker| worker.worker_process_module(source, options, cancellation, progress))
667 }
668
669 pub fn run_data_stream(
683 &mut self,
684 export: &str,
685 request: &Value,
686 rows: &dyn LeanWorkerDataSink,
687 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
688 cancellation: Option<&LeanWorkerCancellationToken>,
689 progress: Option<&dyn LeanWorkerProgressSink>,
690 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
691 self.with_session(|worker| {
692 worker.worker_run_data_stream(export, request, rows, diagnostics, cancellation, progress)
693 })
694 }
695
696 fn run_data_stream_raw(
697 &mut self,
698 export: &str,
699 request: &Value,
700 rows: &dyn LeanWorkerRawDataSink,
701 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
702 cancellation: Option<&LeanWorkerCancellationToken>,
703 progress: Option<&dyn LeanWorkerProgressSink>,
704 ) -> Result<LeanWorkerStreamSummary, LeanWorkerError> {
705 self.with_session(|worker| {
706 worker.worker_run_data_stream_raw(export, request, rows, diagnostics, cancellation, progress)
707 })
708 }
709
710 pub fn run_json_command<Req, Resp>(
723 &mut self,
724 command: &LeanWorkerJsonCommand<Req, Resp>,
725 request: &Req,
726 cancellation: Option<&LeanWorkerCancellationToken>,
727 progress: Option<&dyn LeanWorkerProgressSink>,
728 ) -> Result<Resp, LeanWorkerError>
729 where
730 Req: Serialize,
731 Resp: DeserializeOwned,
732 {
733 let request_json =
734 serde_json::to_string(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
735 export: command.export().to_owned(),
736 message: err.to_string(),
737 })?;
738 let response_json = self.with_session(|worker| {
739 worker.worker_json_command(command.export(), request_json, cancellation, progress)
740 })?;
741 serde_json::from_str(&response_json).map_err(|err| LeanWorkerError::TypedCommandResponseDecode {
742 export: command.export().to_owned(),
743 message: err.to_string(),
744 })
745 }
746
747 pub fn run_streaming_command<Req, Row, Summary>(
763 &mut self,
764 command: &LeanWorkerStreamingCommand<Req, Row, Summary>,
765 request: &Req,
766 rows: &dyn LeanWorkerTypedDataSink<Row>,
767 diagnostics: Option<&dyn LeanWorkerDiagnosticSink>,
768 cancellation: Option<&LeanWorkerCancellationToken>,
769 progress: Option<&dyn LeanWorkerProgressSink>,
770 ) -> Result<LeanWorkerTypedStreamSummary<Summary>, LeanWorkerError>
771 where
772 Req: Serialize,
773 Row: DeserializeOwned,
774 Summary: DeserializeOwned,
775 {
776 let request_value =
777 serde_json::to_value(request).map_err(|err| LeanWorkerError::TypedCommandRequestEncode {
778 export: command.export().to_owned(),
779 message: err.to_string(),
780 })?;
781 let internal_cancellation = LeanWorkerCancellationToken::new();
782 let cancellation_for_stream = cancellation.unwrap_or(&internal_cancellation);
783 let typed_sink = TypedRawDataSink {
784 export: command.export(),
785 rows,
786 cancellation: cancellation_for_stream,
787 decode_error: std::sync::Mutex::new(None),
788 };
789
790 match self.run_data_stream_raw(
793 command.export(),
794 &request_value,
795 &typed_sink,
796 diagnostics,
797 Some(cancellation_for_stream),
798 progress,
799 ) {
800 Ok(summary) => {
801 if let Some(err) = typed_sink.take_decode_error() {
802 return Err(err);
803 }
804 let metadata = summary
805 .metadata
806 .map(|metadata| {
807 serde_json::from_value(metadata).map_err(|err| LeanWorkerError::TypedCommandSummaryDecode {
808 export: command.export().to_owned(),
809 message: err.to_string(),
810 })
811 })
812 .transpose()?;
813 Ok(LeanWorkerTypedStreamSummary {
814 total_rows: summary.total_rows,
815 per_stream_counts: summary.per_stream_counts,
816 elapsed: summary.elapsed,
817 metadata,
818 })
819 }
820 Err(LeanWorkerError::Cancelled { .. }) => {
821 if let Some(err) = typed_sink.take_decode_error() {
822 Err(err)
823 } else {
824 Err(LeanWorkerError::Cancelled {
825 operation: "worker_run_data_stream",
826 })
827 }
828 }
829 Err(err) => Err(err),
830 }
831 }
832
833 pub fn capability_metadata(
846 &mut self,
847 export: &str,
848 request: &Value,
849 cancellation: Option<&LeanWorkerCancellationToken>,
850 progress: Option<&dyn LeanWorkerProgressSink>,
851 ) -> Result<LeanWorkerCapabilityMetadata, LeanWorkerError> {
852 self.with_session(|worker| worker.worker_capability_metadata(export, request, cancellation, progress))
853 }
854
855 pub fn capability_doctor(
868 &mut self,
869 export: &str,
870 request: &Value,
871 cancellation: Option<&LeanWorkerCancellationToken>,
872 progress: Option<&dyn LeanWorkerProgressSink>,
873 ) -> Result<LeanWorkerDoctorReport, LeanWorkerError> {
874 self.with_session(|worker| worker.worker_capability_doctor(export, request, cancellation, progress))
875 }
876
877 fn ensure_open(&self) -> Result<(), LeanWorkerError> {
878 if self.open {
879 Ok(())
880 } else {
881 Err(LeanWorkerError::UnsupportedRequest {
882 operation: "worker_session_invalidated",
883 })
884 }
885 }
886
887 fn with_session<T>(
895 &mut self,
896 f: impl FnOnce(&mut LeanWorker) -> Result<T, LeanWorkerError>,
897 ) -> Result<T, LeanWorkerError> {
898 self.ensure_open()?;
899 let result = f(self.worker);
900 if matches!(
901 result,
902 Err(LeanWorkerError::Cancelled { .. } | LeanWorkerError::Timeout { .. })
903 ) {
904 self.open = false;
905 }
906 result
907 }
908}
909
910struct TypedRawDataSink<'a, Row> {
911 export: &'a str,
912 rows: &'a dyn LeanWorkerTypedDataSink<Row>,
913 cancellation: &'a LeanWorkerCancellationToken,
914 decode_error: std::sync::Mutex<Option<LeanWorkerError>>,
915}
916
917impl<Row> TypedRawDataSink<'_, Row> {
918 fn take_decode_error(&self) -> Option<LeanWorkerError> {
919 self.decode_error.lock().ok().and_then(|mut guard| guard.take())
920 }
921}
922
923impl<Row> LeanWorkerRawDataSink for TypedRawDataSink<'_, Row>
924where
925 Row: DeserializeOwned,
926{
927 fn report(&self, row: LeanWorkerRawDataRow) {
928 match serde_json::from_str(row.payload.get()) {
929 Ok(payload) => self.rows.report(LeanWorkerTypedDataRow {
930 stream: row.stream,
931 sequence: row.sequence,
932 payload,
933 }),
934 Err(err) => {
935 if let Ok(mut guard) = self.decode_error.lock() {
936 *guard = Some(LeanWorkerError::TypedCommandRowDecode {
937 export: self.export.to_owned(),
938 stream: row.stream,
939 sequence: row.sequence,
940 message: err.to_string(),
941 });
942 }
943 self.cancellation.cancel();
944 }
945 }
946 }
947}
948
949pub(crate) fn check_cancelled(
950 operation: &'static str,
951 token: Option<&LeanWorkerCancellationToken>,
952) -> Result<(), LeanWorkerError> {
953 if token.is_some_and(LeanWorkerCancellationToken::is_cancelled) {
954 Err(LeanWorkerError::Cancelled { operation })
955 } else {
956 Ok(())
957 }
958}
959
960pub(crate) fn report_parent_progress(
961 sink: Option<&dyn LeanWorkerProgressSink>,
962 event: LeanWorkerProgressEvent,
963) -> Result<(), LeanWorkerError> {
964 let Some(sink) = sink else {
965 return Ok(());
966 };
967 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(event))).map_err(|payload| {
968 let message = if let Some(s) = payload.downcast_ref::<&str>() {
969 (*s).to_owned()
970 } else if let Some(s) = payload.downcast_ref::<String>() {
971 s.clone()
972 } else {
973 "worker progress sink panicked".to_owned()
974 };
975 LeanWorkerError::ProgressPanic { message }
976 })
977}
978
979pub(crate) fn report_parent_data_row(
980 sink: Option<LeanWorkerDataSinkTarget<'_>>,
981 row: DataRow,
982) -> Result<(), LeanWorkerError> {
983 let Some(sink) = sink else {
984 return Err(LeanWorkerError::Protocol {
985 message: "worker sent data row for a request without a row sink".to_owned(),
986 });
987 };
988 match sink {
989 LeanWorkerDataSinkTarget::Value(sink) => {
990 let row = LeanWorkerDataRow::try_from(row)?;
991 report_value_data_row(sink, row)
992 }
993 LeanWorkerDataSinkTarget::Raw(sink) => report_raw_data_row(sink, row.into()),
994 }
995}
996
997fn report_value_data_row(sink: &dyn LeanWorkerDataSink, row: LeanWorkerDataRow) -> Result<(), LeanWorkerError> {
998 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
999 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1000 (*s).to_owned()
1001 } else if let Some(s) = payload.downcast_ref::<String>() {
1002 s.clone()
1003 } else {
1004 "worker data sink panicked".to_owned()
1005 };
1006 LeanWorkerError::DataSinkPanic { message }
1007 })
1008}
1009
1010fn report_raw_data_row(sink: &dyn LeanWorkerRawDataSink, row: LeanWorkerRawDataRow) -> Result<(), LeanWorkerError> {
1011 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(row))).map_err(|payload| {
1012 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1013 (*s).to_owned()
1014 } else if let Some(s) = payload.downcast_ref::<String>() {
1015 s.clone()
1016 } else {
1017 "worker data sink panicked".to_owned()
1018 };
1019 LeanWorkerError::DataSinkPanic { message }
1020 })
1021}
1022
1023pub(crate) fn report_parent_diagnostic(
1024 sink: Option<&dyn LeanWorkerDiagnosticSink>,
1025 diagnostic: LeanWorkerDiagnosticEvent,
1026) -> Result<(), LeanWorkerError> {
1027 let Some(sink) = sink else {
1028 return Ok(());
1029 };
1030 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| sink.report(diagnostic))).map_err(|payload| {
1031 let message = if let Some(s) = payload.downcast_ref::<&str>() {
1032 (*s).to_owned()
1033 } else if let Some(s) = payload.downcast_ref::<String>() {
1034 s.clone()
1035 } else {
1036 "worker diagnostic sink panicked".to_owned()
1037 };
1038 LeanWorkerError::DiagnosticSinkPanic { message }
1039 })
1040}
1041
1042pub(crate) fn elapsed_event(
1043 phase: String,
1044 current: u64,
1045 total: Option<u64>,
1046 started: Instant,
1047) -> LeanWorkerProgressEvent {
1048 LeanWorkerProgressEvent {
1049 phase,
1050 current,
1051 total,
1052 elapsed: started.elapsed(),
1053 }
1054}