vortex_layout/segments/
events.rs

1use std::fmt::{Debug, Formatter};
2use std::pin::Pin;
3use std::sync::atomic::AtomicBool;
4use std::sync::{Arc, atomic};
5use std::task::{Context, Poll};
6
7use dashmap::{DashMap, Entry};
8use futures::channel::{mpsc, oneshot};
9use futures::future::{BoxFuture, Shared, WeakShared};
10use futures::stream::BoxStream;
11use futures::{FutureExt, StreamExt, TryFutureExt};
12use vortex_buffer::ByteBuffer;
13use vortex_error::{
14    ResultExt, SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_err,
15};
16
17use crate::segments::{SegmentId, SegmentSource};
18
19/// A utility for turning a [`SegmentSource`] into a stream of [`SegmentEvent`]s.
20///
21/// Also performs de-duplication of requests for the same segment, while tracking when the all
22/// requesters have been dropped.
23pub struct SegmentEvents {
24    pending: DashMap<SegmentId, PendingSegment>,
25    events: mpsc::UnboundedSender<SegmentEvent>,
26}
27
28impl SegmentEvents {
29    pub fn create() -> (Arc<dyn SegmentSource>, BoxStream<'static, SegmentEvent>) {
30        let (send, recv) = mpsc::unbounded();
31
32        let events = Arc::new(Self {
33            pending: Default::default(),
34            events: send,
35        });
36
37        let source = Arc::new(EventsSegmentSource { events });
38        let stream = recv.boxed();
39
40        (source, stream)
41    }
42}
43
44pub enum SegmentEvent {
45    Requested(SegmentRequest),
46    Polled(SegmentId),
47    Dropped(SegmentId),
48    Resolved(SegmentId),
49}
50
51impl Debug for SegmentEvent {
52    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53        match self {
54            SegmentEvent::Requested(req) => write!(
55                f,
56                "SegmentEvent::Registered({:?}, {})",
57                req.id, req.for_whom
58            ),
59            SegmentEvent::Polled(id) => write!(f, "SegmentEvent::Polled({:?})", id),
60            SegmentEvent::Dropped(id) => write!(f, "SegmentEvent::Dropped({:?})", id),
61            SegmentEvent::Resolved(id) => write!(f, "SegmentEvent::Resolved({:?})", id),
62        }
63    }
64}
65
66pub struct SegmentRequest {
67    /// The ID of the requested segment
68    id: SegmentId,
69    /// The name of the layout that requested the segment, used only for debugging.
70    for_whom: Arc<str>,
71    /// The one-shot channel to send the segment back to the caller
72    callback: oneshot::Sender<VortexResult<ByteBuffer>>,
73    /// The segment events that we post our resolved event back to.
74    events: Arc<SegmentEvents>,
75}
76
77impl SegmentRequest {
78    pub fn id(&self) -> SegmentId {
79        self.id
80    }
81
82    /// Resolve the segment request with the given buffer result.
83    pub fn resolve(self, buffer: VortexResult<ByteBuffer>) {
84        self.events.submit_event(SegmentEvent::Resolved(self.id));
85        if self.callback.send(buffer).is_err() {
86            // The callback may fail if the caller was dropped while the request was in-flight, as
87            // may be the case with pre-fetched segments.
88            log::debug!("Segment {} dropped while request in-flight", self.id);
89        }
90    }
91}
92
93impl SegmentEvents {
94    /// Get or create a segment future for the given segment ID.
95    fn segment_future(
96        self: Arc<Self>,
97        id: SegmentId,
98        for_whom: Arc<str>,
99    ) -> Shared<SegmentEventsFuture> {
100        loop {
101            // Loop in case the pending future has no strong references, in which case we clear it
102            // out of the map and create a new one on the next iteration.
103            match self.pending.entry(id) {
104                Entry::Occupied(e) => {
105                    if let Some(fut) = e.get().future() {
106                        return fut;
107                    } else {
108                        log::debug!("Re-requesting dropped segment from segment reader {}", id);
109                        e.remove();
110                    }
111                }
112                Entry::Vacant(e) => {
113                    let fut = SegmentEventsFuture::new(id, for_whom, self.clone()).shared();
114                    // Create a new pending segment with a weak reference to the future.
115                    e.insert(PendingSegment {
116                        id,
117                        fut: fut
118                            .downgrade()
119                            .vortex_expect("cannot fail, only just created"),
120                    });
121                    return fut;
122                }
123            }
124        }
125    }
126
127    /// Submit a segment event.
128    fn submit_event(&self, event: SegmentEvent) {
129        if self.events.unbounded_send(event).is_err() {
130            log::trace!("Segment queue shutting down, no problem if we lose events")
131        }
132    }
133}
134
135struct EventsSegmentSource {
136    events: Arc<SegmentEvents>,
137}
138
139impl SegmentSource for EventsSegmentSource {
140    fn request(
141        &self,
142        id: SegmentId,
143        for_whom: &Arc<str>,
144    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
145        self.events
146            .clone()
147            .segment_future(id, for_whom.clone())
148            .map_err(VortexError::from)
149            .boxed()
150    }
151}
152
153/// A pending segment returned by the [`SegmentSource`].
154pub struct PendingSegment {
155    id: SegmentId,
156    /// A weak shared future that we hand out to all requesters. Once all requesters have been
157    /// dropped, typically because their row split has completed (or been pruned), then the weak
158    /// future is no longer upgradable, and the segment can be dropped.
159    fut: WeakShared<SegmentEventsFuture>,
160}
161
162impl Debug for PendingSegment {
163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("PendingSegment")
165            .field("id", &self.id)
166            .finish()
167    }
168}
169
170impl PendingSegment {
171    /// Create a new future resolving this segment, provided the segment is still alive.
172    fn future(&self) -> Option<Shared<SegmentEventsFuture>> {
173        self.fut.upgrade()
174    }
175}
176
177/// A future that notifies the segment queue when it is first polled, as well as logging
178/// when it is dropped.
179struct SegmentEventsFuture {
180    future: BoxFuture<'static, SharedVortexResult<ByteBuffer>>,
181    id: SegmentId,
182    source: Arc<SegmentEvents>,
183    polled: AtomicBool,
184}
185
186impl SegmentEventsFuture {
187    fn new(id: SegmentId, for_whom: Arc<str>, events: Arc<SegmentEvents>) -> Self {
188        let (send, recv) = oneshot::channel::<VortexResult<ByteBuffer>>();
189
190        // Set up the segment future tied to the recv end of the channel.
191        let this = SegmentEventsFuture {
192            future: recv
193                .map_err(|e| vortex_err!("pending segment receiver dropped: {}", e))
194                .map(|r| r.unnest())
195                .map_err(Arc::new)
196                .boxed(),
197            id,
198            source: events.clone(),
199            polled: AtomicBool::new(false),
200        };
201
202        // Set up a SegmentRequest tied to the send end of the channel.
203        events.submit_event(SegmentEvent::Requested(SegmentRequest {
204            id,
205            for_whom,
206            callback: send,
207            events: events.clone(),
208        }));
209
210        this
211    }
212}
213
214impl Future for SegmentEventsFuture {
215    type Output = SharedVortexResult<ByteBuffer>;
216
217    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218        if !self.polled.fetch_or(true, atomic::Ordering::Relaxed) {
219            self.source.submit_event(SegmentEvent::Polled(self.id));
220        }
221        self.future.poll_unpin(cx)
222    }
223}
224
225impl Drop for SegmentEventsFuture {
226    fn drop(&mut self) {
227        self.source.submit_event(SegmentEvent::Dropped(self.id));
228    }
229}