lance_core/utils/
tracing.rs1use futures::Stream;
5use pin_project::pin_project;
6use tracing::Span;
7
8#[pin_project]
9pub struct InstrumentedStream<I: Stream> {
10 #[pin]
11 stream: I,
12 span: Span,
13}
14
15impl<I: Stream> Stream for InstrumentedStream<I> {
16 type Item = I::Item;
17
18 fn poll_next(
19 self: std::pin::Pin<&mut Self>,
20 cx: &mut std::task::Context<'_>,
21 ) -> std::task::Poll<Option<Self::Item>> {
22 let this = self.project();
23 let _guard = this.span.enter();
24 this.stream.poll_next(cx)
25 }
26}
27
28pub trait StreamTracingExt {
31 fn stream_in_current_span(self) -> InstrumentedStream<Self>
33 where
34 Self: Stream,
35 Self: Sized;
36}
37
38impl<S: Stream> StreamTracingExt for S {
39 fn stream_in_current_span(self) -> InstrumentedStream<Self>
40 where
41 Self: Stream,
42 Self: Sized,
43 {
44 InstrumentedStream {
45 stream: self,
46 span: Span::current(),
47 }
48 }
49}
50
51pub const TRACE_FILE_AUDIT: &str = "lance::file_audit";
52pub const AUDIT_MODE_CREATE: &str = "create";
53pub const AUDIT_MODE_DELETE: &str = "delete";
54pub const AUDIT_MODE_DELETE_UNVERIFIED: &str = "delete_unverified";
55pub const AUDIT_TYPE_DELETION: &str = "deletion";
56pub const AUDIT_TYPE_MANIFEST: &str = "manifest";
57pub const AUDIT_TYPE_INDEX: &str = "index";
58pub const AUDIT_TYPE_DATA: &str = "data";
59pub const TRACE_FILE_CREATE: &str = "create";
60pub const TRACE_IO_EVENTS: &str = "lance::io_events";
61pub const IO_TYPE_OPEN_SCALAR: &str = "open_scalar_index";
62pub const IO_TYPE_OPEN_VECTOR: &str = "open_vector_index";
63pub const IO_TYPE_LOAD_VECTOR_PART: &str = "load_vector_part";
64pub const IO_TYPE_LOAD_SCALAR_PART: &str = "load_scalar_part";
65pub const TRACE_EXECUTION: &str = "lance::execution";
66pub const EXECUTION_PLAN_RUN: &str = "plan_run";