vortex_layout/segments/
events.rs1use std::fmt::{Debug, Formatter};
5use std::pin::Pin;
6use std::sync::atomic::AtomicBool;
7use std::sync::{Arc, atomic};
8use std::task::{Context, Poll};
9
10use dashmap::{DashMap, Entry};
11use futures::channel::{mpsc, oneshot};
12use futures::future::{BoxFuture, Shared, WeakShared};
13use futures::stream::BoxStream;
14use futures::{FutureExt, StreamExt, TryFutureExt};
15use vortex_buffer::ByteBuffer;
16use vortex_error::{SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_err};
17
18use crate::segments::{SegmentFuture, SegmentId, SegmentSource};
19
20pub struct SegmentEvents {
25 pending: DashMap<SegmentId, PendingSegment>,
26 events: mpsc::UnboundedSender<SegmentEvent>,
27}
28
29impl SegmentEvents {
30 pub fn create() -> (Arc<dyn SegmentSource>, BoxStream<'static, SegmentEvent>) {
31 let (send, recv) = mpsc::unbounded();
32
33 let events = Arc::new(Self {
34 pending: Default::default(),
35 events: send,
36 });
37
38 let source = Arc::new(EventsSegmentSource { events });
39 let stream = recv.boxed();
40
41 (source, stream)
42 }
43}
44
45pub enum SegmentEvent {
46 Requested(SegmentRequest),
47 Polled(SegmentId),
48 Dropped(SegmentId),
49 Resolved(SegmentId),
50}
51
52impl Debug for SegmentEvent {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 match self {
55 SegmentEvent::Requested(req) => write!(f, "SegmentEvent::Registered({:?})", req.id),
56 SegmentEvent::Polled(id) => write!(f, "SegmentEvent::Polled({id:?})"),
57 SegmentEvent::Dropped(id) => write!(f, "SegmentEvent::Dropped({id:?})"),
58 SegmentEvent::Resolved(id) => write!(f, "SegmentEvent::Resolved({id:?})"),
59 }
60 }
61}
62
63pub struct SegmentRequest {
64 id: SegmentId,
66 callback: oneshot::Sender<VortexResult<ByteBuffer>>,
68 events: Arc<SegmentEvents>,
70}
71
72impl SegmentRequest {
73 pub fn id(&self) -> SegmentId {
74 self.id
75 }
76
77 pub fn resolve(self, buffer: VortexResult<ByteBuffer>) {
79 self.events.submit_event(SegmentEvent::Resolved(self.id));
80 if self.callback.send(buffer).is_err() {
81 log::trace!(
84 "Segment {} receiver dropped while request in-flight (expected for pre-fetched segments)",
85 self.id
86 );
87 }
88 }
89}
90
91impl SegmentEvents {
92 fn segment_future(self: Arc<Self>, id: SegmentId) -> Shared<SegmentEventsFuture> {
94 loop {
95 match self.pending.entry(id) {
98 Entry::Occupied(e) => {
99 if let Some(fut) = e.get().future() {
100 return fut;
101 } else {
102 log::debug!("Re-requesting dropped segment from segment reader {id}");
103 e.remove();
104 }
105 }
106 Entry::Vacant(e) => {
107 let fut = SegmentEventsFuture::new(id, self.clone()).shared();
108 e.insert(PendingSegment {
110 id,
111 fut: fut
112 .downgrade()
113 .vortex_expect("cannot fail, only just created"),
114 });
115 return fut;
116 }
117 }
118 }
119 }
120
121 fn submit_event(&self, event: SegmentEvent) {
123 if self.events.unbounded_send(event).is_err() {
124 log::trace!("Segment queue shutting down, no problem if we lose events")
125 }
126 }
127}
128
129struct EventsSegmentSource {
130 events: Arc<SegmentEvents>,
131}
132
133impl SegmentSource for EventsSegmentSource {
134 fn request(&self, id: SegmentId) -> SegmentFuture {
135 self.events
136 .clone()
137 .segment_future(id)
138 .map_err(VortexError::from)
139 .boxed()
140 }
141}
142
143struct PendingSegment {
145 id: SegmentId,
146 fut: WeakShared<SegmentEventsFuture>,
150}
151
152impl Debug for PendingSegment {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("PendingSegment")
155 .field("id", &self.id)
156 .finish()
157 }
158}
159
160impl PendingSegment {
161 fn future(&self) -> Option<Shared<SegmentEventsFuture>> {
163 self.fut.upgrade()
164 }
165}
166
167struct SegmentEventsFuture {
170 future: BoxFuture<'static, SharedVortexResult<ByteBuffer>>,
171 id: SegmentId,
172 source: Arc<SegmentEvents>,
173 polled: AtomicBool,
174}
175
176impl SegmentEventsFuture {
177 fn new(id: SegmentId, events: Arc<SegmentEvents>) -> Self {
178 let (send, recv) = oneshot::channel::<VortexResult<ByteBuffer>>();
179
180 let this = SegmentEventsFuture {
182 future: recv
183 .map_err(|e| vortex_err!("pending segment receiver dropped: {}", e))
184 .map(|r| r.flatten())
185 .map_err(Arc::new)
186 .boxed(),
187 id,
188 source: events.clone(),
189 polled: AtomicBool::new(false),
190 };
191
192 events.submit_event(SegmentEvent::Requested(SegmentRequest {
194 id,
195 callback: send,
196 events: events.clone(),
197 }));
198
199 this
200 }
201}
202
203impl Future for SegmentEventsFuture {
204 type Output = SharedVortexResult<ByteBuffer>;
205
206 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207 if !self.polled.fetch_or(true, atomic::Ordering::Relaxed) {
208 self.source.submit_event(SegmentEvent::Polled(self.id));
209 }
210 self.future.poll_unpin(cx)
211 }
212}
213
214impl Drop for SegmentEventsFuture {
215 fn drop(&mut self) {
216 self.source.submit_event(SegmentEvent::Dropped(self.id));
217 }
218}