vortex_file/segments/
source.rs1use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::task::Context;
9use std::task::Poll;
10
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::channel::mpsc;
14use futures::future;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_error::VortexResult;
18use vortex_error::vortex_err;
19use vortex_error::vortex_panic;
20use vortex_io::VortexReadAt;
21use vortex_io::runtime::Handle;
22use vortex_layout::segments::SegmentFuture;
23use vortex_layout::segments::SegmentId;
24use vortex_layout::segments::SegmentSource;
25use vortex_metrics::Counter;
26use vortex_metrics::Histogram;
27use vortex_metrics::Label;
28use vortex_metrics::MetricBuilder;
29use vortex_metrics::MetricsRegistry;
30
31use crate::SegmentSpec;
32use crate::read::IoRequestStream;
33use crate::read::ReadRequest;
34use crate::read::RequestId;
35
36#[derive(Debug)]
37pub enum ReadEvent {
38 Request(ReadRequest),
39 Polled(RequestId),
40 Dropped(RequestId),
41}
42
43pub struct FileSegmentSource {
65 segments: Arc<[SegmentSpec]>,
66 events: mpsc::UnboundedSender<ReadEvent>,
68 next_id: Arc<AtomicUsize>,
70}
71
72impl FileSegmentSource {
73 pub fn open<R: VortexReadAt + Clone>(
74 segments: Arc<[SegmentSpec]>,
75 reader: R,
76 handle: Handle,
77 metrics: RequestMetrics,
78 ) -> Self {
79 let (send, recv) = mpsc::unbounded();
80
81 let max_alignment = segments
82 .iter()
83 .map(|segment| segment.alignment)
84 .max()
85 .unwrap_or_else(Alignment::none);
86 let coalesce_config = reader.coalesce_config().map(|mut config| {
87 let extra = (*max_alignment as u64).saturating_sub(1);
90 config.max_size = config.max_size.saturating_add(extra);
91 config
92 });
93 let concurrency = reader.concurrency();
94 if concurrency == 0 {
95 vortex_panic!(
96 "VortexReadAt::concurrency returned 0 (uri={:?}); this would stall I/O",
97 reader.uri()
98 );
99 }
100
101 let stream = IoRequestStream::new(
102 StreamExt::boxed(recv),
103 coalesce_config,
104 max_alignment,
105 metrics,
106 )
107 .boxed();
108
109 let drive_fut = async move {
110 stream
111 .map(move |req| {
112 let reader = reader.clone();
113 async move {
114 let result = reader
115 .read_at(req.offset(), req.len(), req.alignment())
116 .await;
117 req.resolve(result);
118 }
119 })
120 .buffer_unordered(concurrency)
121 .collect::<()>()
122 .await
123 };
124
125 handle.spawn(drive_fut).detach();
126
127 Self {
128 segments,
129 events: send,
130 next_id: Arc::new(AtomicUsize::new(0)),
131 }
132 }
133}
134
135impl SegmentSource for FileSegmentSource {
136 fn request(&self, id: SegmentId) -> SegmentFuture {
137 let spec = match self.segments.get(*id as usize).cloned() {
140 Some(spec) => spec,
141 None => {
142 return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
143 }
144 };
145
146 let SegmentSpec {
147 offset,
148 length,
149 alignment,
150 } = spec;
151
152 let (send, recv) = oneshot::channel();
153 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
154 let event = ReadEvent::Request(ReadRequest {
155 id,
156 offset,
157 length: length as usize,
158 alignment,
159 callback: send,
160 });
161
162 if let Err(e) = self.events.unbounded_send(event) {
164 return future::ready(Err(vortex_err!("Failed to submit read request: {e}"))).boxed();
165 }
166
167 let fut = ReadFuture {
168 id,
169 recv,
170 polled: false,
171 finished: false,
172 events: self.events.clone(),
173 };
174
175 fut.boxed()
177 }
178}
179
180struct ReadFuture {
185 id: usize,
186 recv: oneshot::Receiver<VortexResult<BufferHandle>>,
187 polled: bool,
188 finished: bool,
189 events: mpsc::UnboundedSender<ReadEvent>,
190}
191
192impl Future for ReadFuture {
193 type Output = VortexResult<BufferHandle>;
194
195 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196 match self.recv.poll_unpin(cx) {
197 Poll::Ready(result) => {
198 self.finished = true;
199 Poll::Ready(
203 result.unwrap_or_else(|e| {
204 Err(vortex_err!("ReadRequest dropped by runtime: {e}"))
205 }),
206 )
207 }
208 Poll::Pending if !self.polled => {
209 self.polled = true;
210 match self.events.unbounded_send(ReadEvent::Polled(self.id)) {
212 Ok(()) => Poll::Pending,
213 Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
214 }
215 }
216 _ => Poll::Pending,
217 }
218 }
219}
220
221impl Drop for ReadFuture {
222 fn drop(&mut self) {
223 if self.finished {
225 return;
226 }
227
228 drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
230 }
231}
232
233pub struct RequestMetrics {
234 pub individual_requests: Counter,
235 pub coalesced_requests: Counter,
236 pub num_requests_coalesced: Histogram,
237}
238
239impl RequestMetrics {
240 pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec<Label>) -> Self {
241 Self {
242 individual_requests: MetricBuilder::new(metrics_registry)
243 .add_labels(labels.clone())
244 .counter("io.requests.individual"),
245 coalesced_requests: MetricBuilder::new(metrics_registry)
246 .add_labels(labels.clone())
247 .counter("io.requests.coalesced"),
248 num_requests_coalesced: MetricBuilder::new(metrics_registry)
249 .add_labels(labels)
250 .histogram("io.requests.coalesced.num_coalesced"),
251 }
252 }
253}