vortex_layout/segments/
events.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::{Debug, Formatter};
5use std::pin::Pin;
6use std::sync::atomic::AtomicBool;
7use std::sync::{Arc, atomic};
8use std::task::{Context, Poll};
9
10use dashmap::{DashMap, Entry};
11use futures::channel::{mpsc, oneshot};
12use futures::future::{BoxFuture, Shared, WeakShared};
13use futures::stream::BoxStream;
14use futures::{FutureExt, StreamExt, TryFutureExt};
15use vortex_buffer::ByteBuffer;
16use vortex_error::{SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_err};
17
18use crate::segments::{SegmentFuture, SegmentId, SegmentSource};
19
20/// A utility for turning a [`SegmentSource`] into a stream of [`SegmentEvent`]s.
21///
22/// Also performs de-duplication of requests for the same segment, while tracking when the all
23/// requesters have been dropped.
24pub struct SegmentEvents {
25    pending: DashMap<SegmentId, PendingSegment>,
26    events: mpsc::UnboundedSender<SegmentEvent>,
27}
28
29impl SegmentEvents {
30    pub fn create() -> (Arc<dyn SegmentSource>, BoxStream<'static, SegmentEvent>) {
31        let (send, recv) = mpsc::unbounded();
32
33        let events = Arc::new(Self {
34            pending: Default::default(),
35            events: send,
36        });
37
38        let source = Arc::new(EventsSegmentSource { events });
39        let stream = recv.boxed();
40
41        (source, stream)
42    }
43}
44
45pub enum SegmentEvent {
46    Requested(SegmentRequest),
47    Polled(SegmentId),
48    Dropped(SegmentId),
49    Resolved(SegmentId),
50}
51
52impl Debug for SegmentEvent {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        match self {
55            SegmentEvent::Requested(req) => write!(f, "SegmentEvent::Registered({:?})", req.id),
56            SegmentEvent::Polled(id) => write!(f, "SegmentEvent::Polled({id:?})"),
57            SegmentEvent::Dropped(id) => write!(f, "SegmentEvent::Dropped({id:?})"),
58            SegmentEvent::Resolved(id) => write!(f, "SegmentEvent::Resolved({id:?})"),
59        }
60    }
61}
62
63pub struct SegmentRequest {
64    /// The ID of the requested segment
65    id: SegmentId,
66    /// The one-shot channel to send the segment back to the caller
67    callback: oneshot::Sender<VortexResult<ByteBuffer>>,
68    /// The segment events that we post our resolved event back to.
69    events: Arc<SegmentEvents>,
70}
71
72impl SegmentRequest {
73    pub fn id(&self) -> SegmentId {
74        self.id
75    }
76
77    /// Resolve the segment request with the given buffer result.
78    pub fn resolve(self, buffer: VortexResult<ByteBuffer>) {
79        self.events.submit_event(SegmentEvent::Resolved(self.id));
80        if self.callback.send(buffer).is_err() {
81            // The callback may fail if the caller was dropped while the request was in-flight, as
82            // may be the case with pre-fetched segments. This is expected behavior and not an error.
83            log::trace!(
84                "Segment {} receiver dropped while request in-flight (expected for pre-fetched segments)",
85                self.id
86            );
87        }
88    }
89}
90
91impl SegmentEvents {
92    /// Get or create a segment future for the given segment ID.
93    fn segment_future(self: Arc<Self>, id: SegmentId) -> Shared<SegmentEventsFuture> {
94        loop {
95            // Loop in case the pending future has no strong references, in which case we clear it
96            // out of the map and create a new one on the next iteration.
97            match self.pending.entry(id) {
98                Entry::Occupied(e) => {
99                    if let Some(fut) = e.get().future() {
100                        return fut;
101                    } else {
102                        log::debug!("Re-requesting dropped segment from segment reader {id}");
103                        e.remove();
104                    }
105                }
106                Entry::Vacant(e) => {
107                    let fut = SegmentEventsFuture::new(id, self.clone()).shared();
108                    // Create a new pending segment with a weak reference to the future.
109                    e.insert(PendingSegment {
110                        id,
111                        fut: fut
112                            .downgrade()
113                            .vortex_expect("cannot fail, only just created"),
114                    });
115                    return fut;
116                }
117            }
118        }
119    }
120
121    /// Submit a segment event.
122    fn submit_event(&self, event: SegmentEvent) {
123        if self.events.unbounded_send(event).is_err() {
124            log::trace!("Segment queue shutting down, no problem if we lose events")
125        }
126    }
127}
128
129struct EventsSegmentSource {
130    events: Arc<SegmentEvents>,
131}
132
133impl SegmentSource for EventsSegmentSource {
134    fn request(&self, id: SegmentId) -> SegmentFuture {
135        self.events
136            .clone()
137            .segment_future(id)
138            .map_err(VortexError::from)
139            .boxed()
140    }
141}
142
143/// A pending segment returned by the [`SegmentSource`].
144struct PendingSegment {
145    id: SegmentId,
146    /// A weak shared future that we hand out to all requesters. Once all requesters have been
147    /// dropped, typically because their row split has completed (or been pruned), then the weak
148    /// future is no longer upgradable, and the segment can be dropped.
149    fut: WeakShared<SegmentEventsFuture>,
150}
151
152impl Debug for PendingSegment {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("PendingSegment")
155            .field("id", &self.id)
156            .finish()
157    }
158}
159
160impl PendingSegment {
161    /// Create a new future resolving this segment, provided the segment is still alive.
162    fn future(&self) -> Option<Shared<SegmentEventsFuture>> {
163        self.fut.upgrade()
164    }
165}
166
167/// A future that notifies the segment queue when it is first polled, as well as logging
168/// when it is dropped.
169struct SegmentEventsFuture {
170    future: BoxFuture<'static, SharedVortexResult<ByteBuffer>>,
171    id: SegmentId,
172    source: Arc<SegmentEvents>,
173    polled: AtomicBool,
174}
175
176impl SegmentEventsFuture {
177    fn new(id: SegmentId, events: Arc<SegmentEvents>) -> Self {
178        let (send, recv) = oneshot::channel::<VortexResult<ByteBuffer>>();
179
180        // Set up the segment future tied to the recv end of the channel.
181        let this = SegmentEventsFuture {
182            future: recv
183                .map_err(|e| vortex_err!("pending segment receiver dropped: {}", e))
184                .map(|r| r.flatten())
185                .map_err(Arc::new)
186                .boxed(),
187            id,
188            source: events.clone(),
189            polled: AtomicBool::new(false),
190        };
191
192        // Set up a SegmentRequest tied to the send end of the channel.
193        events.submit_event(SegmentEvent::Requested(SegmentRequest {
194            id,
195            callback: send,
196            events: events.clone(),
197        }));
198
199        this
200    }
201}
202
203impl Future for SegmentEventsFuture {
204    type Output = SharedVortexResult<ByteBuffer>;
205
206    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207        if !self.polled.fetch_or(true, atomic::Ordering::Relaxed) {
208            self.source.submit_event(SegmentEvent::Polled(self.id));
209        }
210        self.future.poll_unpin(cx)
211    }
212}
213
214impl Drop for SegmentEventsFuture {
215    fn drop(&mut self) {
216        self.source.submit_event(SegmentEvent::Dropped(self.id));
217    }
218}