vortex_layout/segments/
events.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::{Debug, Formatter};
5use std::pin::Pin;
6use std::sync::atomic::AtomicBool;
7use std::sync::{Arc, atomic};
8use std::task::{Context, Poll};
9
10use dashmap::{DashMap, Entry};
11use futures::channel::{mpsc, oneshot};
12use futures::future::{BoxFuture, Shared, WeakShared};
13use futures::stream::BoxStream;
14use futures::{FutureExt, StreamExt, TryFutureExt};
15use vortex_buffer::ByteBuffer;
16use vortex_error::{
17    ResultExt, SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_err,
18};
19
20use crate::segments::{SegmentId, SegmentSource};
21
22/// A utility for turning a [`SegmentSource`] into a stream of [`SegmentEvent`]s.
23///
24/// Also performs de-duplication of requests for the same segment, while tracking when the all
25/// requesters have been dropped.
26pub struct SegmentEvents {
27    pending: DashMap<SegmentId, PendingSegment>,
28    events: mpsc::UnboundedSender<SegmentEvent>,
29}
30
31impl SegmentEvents {
32    pub fn create() -> (Arc<dyn SegmentSource>, BoxStream<'static, SegmentEvent>) {
33        let (send, recv) = mpsc::unbounded();
34
35        let events = Arc::new(Self {
36            pending: Default::default(),
37            events: send,
38        });
39
40        let source = Arc::new(EventsSegmentSource { events });
41        let stream = recv.boxed();
42
43        (source, stream)
44    }
45}
46
47pub enum SegmentEvent {
48    Requested(SegmentRequest),
49    Polled(SegmentId),
50    Dropped(SegmentId),
51    Resolved(SegmentId),
52}
53
54impl Debug for SegmentEvent {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        match self {
57            SegmentEvent::Requested(req) => write!(
58                f,
59                "SegmentEvent::Registered({:?}, {})",
60                req.id, req.for_whom
61            ),
62            SegmentEvent::Polled(id) => write!(f, "SegmentEvent::Polled({id:?})"),
63            SegmentEvent::Dropped(id) => write!(f, "SegmentEvent::Dropped({id:?})"),
64            SegmentEvent::Resolved(id) => write!(f, "SegmentEvent::Resolved({id:?})"),
65        }
66    }
67}
68
69pub struct SegmentRequest {
70    /// The ID of the requested segment
71    id: SegmentId,
72    /// The name of the layout that requested the segment, used only for debugging.
73    for_whom: Arc<str>,
74    /// The one-shot channel to send the segment back to the caller
75    callback: oneshot::Sender<VortexResult<ByteBuffer>>,
76    /// The segment events that we post our resolved event back to.
77    events: Arc<SegmentEvents>,
78}
79
80impl SegmentRequest {
81    pub fn id(&self) -> SegmentId {
82        self.id
83    }
84
85    // Helper method to know who requested the segment.
86    // Hidden from the documented API as this is only used for logging in the vortex-file crate.
87    #[doc(hidden)]
88    pub fn for_whom(&self) -> &str {
89        &self.for_whom
90    }
91
92    /// Resolve the segment request with the given buffer result.
93    pub fn resolve(self, buffer: VortexResult<ByteBuffer>) {
94        self.events.submit_event(SegmentEvent::Resolved(self.id));
95        if self.callback.send(buffer).is_err() {
96            // The callback may fail if the caller was dropped while the request was in-flight, as
97            // may be the case with pre-fetched segments.
98            log::debug!("Segment {} dropped while request in-flight", self.id);
99        }
100    }
101}
102
103impl SegmentEvents {
104    /// Get or create a segment future for the given segment ID.
105    fn segment_future(
106        self: Arc<Self>,
107        id: SegmentId,
108        for_whom: Arc<str>,
109    ) -> Shared<SegmentEventsFuture> {
110        loop {
111            // Loop in case the pending future has no strong references, in which case we clear it
112            // out of the map and create a new one on the next iteration.
113            match self.pending.entry(id) {
114                Entry::Occupied(e) => {
115                    if let Some(fut) = e.get().future() {
116                        return fut;
117                    } else {
118                        log::debug!("Re-requesting dropped segment from segment reader {id}");
119                        e.remove();
120                    }
121                }
122                Entry::Vacant(e) => {
123                    let fut = SegmentEventsFuture::new(id, for_whom, self.clone()).shared();
124                    // Create a new pending segment with a weak reference to the future.
125                    e.insert(PendingSegment {
126                        id,
127                        fut: fut
128                            .downgrade()
129                            .vortex_expect("cannot fail, only just created"),
130                    });
131                    return fut;
132                }
133            }
134        }
135    }
136
137    /// Submit a segment event.
138    fn submit_event(&self, event: SegmentEvent) {
139        if self.events.unbounded_send(event).is_err() {
140            log::trace!("Segment queue shutting down, no problem if we lose events")
141        }
142    }
143}
144
145struct EventsSegmentSource {
146    events: Arc<SegmentEvents>,
147}
148
149impl SegmentSource for EventsSegmentSource {
150    fn request(
151        &self,
152        id: SegmentId,
153        for_whom: &Arc<str>,
154    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
155        self.events
156            .clone()
157            .segment_future(id, for_whom.clone())
158            .map_err(VortexError::from)
159            .boxed()
160    }
161}
162
163/// A pending segment returned by the [`SegmentSource`].
164struct PendingSegment {
165    id: SegmentId,
166    /// A weak shared future that we hand out to all requesters. Once all requesters have been
167    /// dropped, typically because their row split has completed (or been pruned), then the weak
168    /// future is no longer upgradable, and the segment can be dropped.
169    fut: WeakShared<SegmentEventsFuture>,
170}
171
172impl Debug for PendingSegment {
173    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct("PendingSegment")
175            .field("id", &self.id)
176            .finish()
177    }
178}
179
180impl PendingSegment {
181    /// Create a new future resolving this segment, provided the segment is still alive.
182    fn future(&self) -> Option<Shared<SegmentEventsFuture>> {
183        self.fut.upgrade()
184    }
185}
186
187/// A future that notifies the segment queue when it is first polled, as well as logging
188/// when it is dropped.
189struct SegmentEventsFuture {
190    future: BoxFuture<'static, SharedVortexResult<ByteBuffer>>,
191    id: SegmentId,
192    source: Arc<SegmentEvents>,
193    polled: AtomicBool,
194}
195
196impl SegmentEventsFuture {
197    fn new(id: SegmentId, for_whom: Arc<str>, events: Arc<SegmentEvents>) -> Self {
198        let (send, recv) = oneshot::channel::<VortexResult<ByteBuffer>>();
199
200        // Set up the segment future tied to the recv end of the channel.
201        let this = SegmentEventsFuture {
202            future: recv
203                .map_err(|e| vortex_err!("pending segment receiver dropped: {}", e))
204                .map(|r| r.unnest())
205                .map_err(Arc::new)
206                .boxed(),
207            id,
208            source: events.clone(),
209            polled: AtomicBool::new(false),
210        };
211
212        // Set up a SegmentRequest tied to the send end of the channel.
213        events.submit_event(SegmentEvent::Requested(SegmentRequest {
214            id,
215            for_whom,
216            callback: send,
217            events: events.clone(),
218        }));
219
220        this
221    }
222}
223
224impl Future for SegmentEventsFuture {
225    type Output = SharedVortexResult<ByteBuffer>;
226
227    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228        if !self.polled.fetch_or(true, atomic::Ordering::Relaxed) {
229            self.source.submit_event(SegmentEvent::Polled(self.id));
230        }
231        self.future.poll_unpin(cx)
232    }
233}
234
235impl Drop for SegmentEventsFuture {
236    fn drop(&mut self) {
237        self.source.submit_event(SegmentEvent::Dropped(self.id));
238    }
239}