vortex_file/segments/
source.rs1use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::task;
9use std::task::Context;
10use std::task::Poll;
11
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::channel::mpsc;
15use futures::future;
16use vortex_array::buffer::BufferHandle;
17use vortex_buffer::Alignment;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20use vortex_error::vortex_panic;
21use vortex_io::VortexReadAt;
22use vortex_io::runtime::Handle;
23use vortex_layout::segments::SegmentFuture;
24use vortex_layout::segments::SegmentId;
25use vortex_layout::segments::SegmentSource;
26use vortex_metrics::Counter;
27use vortex_metrics::Histogram;
28use vortex_metrics::Label;
29use vortex_metrics::MetricBuilder;
30use vortex_metrics::MetricsRegistry;
31
32use crate::SegmentSpec;
33use crate::read::IoRequestStream;
34use crate::read::ReadRequest;
35use crate::read::RequestId;
36
37#[derive(Debug)]
38pub enum ReadEvent {
39 Request(ReadRequest),
40 Polled(RequestId),
41 Dropped(RequestId),
42}
43
44pub struct FileSegmentSource {
66 segments: Arc<[SegmentSpec]>,
67 events: mpsc::UnboundedSender<ReadEvent>,
69 next_id: Arc<AtomicUsize>,
71}
72
73impl FileSegmentSource {
74 pub fn open<R: VortexReadAt + Clone>(
75 segments: Arc<[SegmentSpec]>,
76 reader: R,
77 handle: Handle,
78 metrics: RequestMetrics,
79 ) -> Self {
80 let (send, recv) = mpsc::unbounded();
81
82 let max_alignment = segments
83 .iter()
84 .map(|segment| segment.alignment)
85 .max()
86 .unwrap_or_else(Alignment::none);
87 let coalesce_config = reader.coalesce_config().map(|mut config| {
88 let extra = (*max_alignment as u64).saturating_sub(1);
91 config.max_size = config.max_size.saturating_add(extra);
92 config
93 });
94 let concurrency = reader.concurrency();
95 if concurrency == 0 {
96 vortex_panic!(
97 "VortexReadAt::concurrency returned 0 (uri={:?}); this would stall I/O",
98 reader.uri()
99 );
100 }
101
102 let stream = IoRequestStream::new(
103 StreamExt::boxed(recv),
104 coalesce_config,
105 max_alignment,
106 metrics,
107 )
108 .boxed();
109
110 let drive_fut = async move {
111 stream
112 .map(move |req| {
113 let reader = reader.clone();
114 async move {
115 let result = reader
116 .read_at(req.offset(), req.len(), req.alignment())
117 .await;
118 req.resolve(result);
119 }
120 })
121 .buffer_unordered(concurrency)
122 .collect::<()>()
123 .await
124 };
125
126 handle.spawn(drive_fut).detach();
127
128 Self {
129 segments,
130 events: send,
131 next_id: Arc::new(AtomicUsize::new(0)),
132 }
133 }
134}
135
136impl SegmentSource for FileSegmentSource {
137 fn request(&self, id: SegmentId) -> SegmentFuture {
138 let spec = match self.segments.get(*id as usize).cloned() {
141 Some(spec) => spec,
142 None => {
143 return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
144 }
145 };
146
147 let SegmentSpec {
148 offset,
149 length,
150 alignment,
151 } = spec;
152
153 let (send, recv) = oneshot::channel();
154 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
155 let event = ReadEvent::Request(ReadRequest {
156 id,
157 offset,
158 length: length as usize,
159 alignment,
160 callback: send,
161 });
162
163 if let Err(e) = self.events.unbounded_send(event) {
165 return future::ready(Err(vortex_err!("Failed to submit read request: {e}"))).boxed();
166 }
167
168 let fut = ReadFuture {
169 id,
170 recv,
171 polled: false,
172 events: self.events.clone(),
173 };
174
175 fut.boxed()
177 }
178}
179
180struct ReadFuture {
185 id: usize,
186 recv: oneshot::Receiver<VortexResult<BufferHandle>>,
187 polled: bool,
188 events: mpsc::UnboundedSender<ReadEvent>,
189}
190
191impl Future for ReadFuture {
192 type Output = VortexResult<BufferHandle>;
193
194 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 if !self.polled {
196 self.polled = true;
197 if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
199 return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
200 }
201 }
202
203 match task::ready!(self.recv.poll_unpin(cx)) {
204 Ok(result) => Poll::Ready(result),
205 Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
206 }
207 }
208}
209
210impl Drop for ReadFuture {
211 fn drop(&mut self) {
212 drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
215 }
216}
217
218pub struct RequestMetrics {
219 pub individual_requests: Counter,
220 pub coalesced_requests: Counter,
221 pub num_requests_coalesced: Histogram,
222}
223
224impl RequestMetrics {
225 pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec<Label>) -> Self {
226 Self {
227 individual_requests: MetricBuilder::new(metrics_registry)
228 .add_labels(labels.clone())
229 .counter("io.requests.individual"),
230 coalesced_requests: MetricBuilder::new(metrics_registry)
231 .add_labels(labels.clone())
232 .counter("io.requests.coalesced"),
233 num_requests_coalesced: MetricBuilder::new(metrics_registry)
234 .add_labels(labels)
235 .histogram("io.requests.coalesced.num_coalesced"),
236 }
237 }
238}