1use std::fmt::{Debug, Formatter};
2use std::pin::Pin;
3use std::sync::atomic::AtomicBool;
4use std::sync::{Arc, atomic};
5use std::task::{Context, Poll};
6
7use dashmap::{DashMap, Entry};
8use futures::channel::{mpsc, oneshot};
9use futures::future::{BoxFuture, Shared, WeakShared};
10use futures::stream::BoxStream;
11use futures::{FutureExt, StreamExt, TryFutureExt};
12use vortex_buffer::ByteBuffer;
13use vortex_error::{
14 ResultExt, SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_err,
15};
16
17use crate::segments::{SegmentId, SegmentSource};
18
19pub struct SegmentEvents {
24 pending: DashMap<SegmentId, PendingSegment>,
25 events: mpsc::UnboundedSender<SegmentEvent>,
26}
27
28impl SegmentEvents {
29 pub fn create() -> (Arc<dyn SegmentSource>, BoxStream<'static, SegmentEvent>) {
30 let (send, recv) = mpsc::unbounded();
31
32 let events = Arc::new(Self {
33 pending: Default::default(),
34 events: send,
35 });
36
37 let source = Arc::new(EventsSegmentSource { events });
38 let stream = recv.boxed();
39
40 (source, stream)
41 }
42}
43
44pub enum SegmentEvent {
45 Requested(SegmentRequest),
46 Polled(SegmentId),
47 Dropped(SegmentId),
48 Resolved(SegmentId),
49}
50
51impl Debug for SegmentEvent {
52 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53 match self {
54 SegmentEvent::Requested(req) => write!(
55 f,
56 "SegmentEvent::Registered({:?}, {})",
57 req.id, req.for_whom
58 ),
59 SegmentEvent::Polled(id) => write!(f, "SegmentEvent::Polled({:?})", id),
60 SegmentEvent::Dropped(id) => write!(f, "SegmentEvent::Dropped({:?})", id),
61 SegmentEvent::Resolved(id) => write!(f, "SegmentEvent::Resolved({:?})", id),
62 }
63 }
64}
65
66pub struct SegmentRequest {
67 id: SegmentId,
69 for_whom: Arc<str>,
71 callback: oneshot::Sender<VortexResult<ByteBuffer>>,
73 events: Arc<SegmentEvents>,
75}
76
77impl SegmentRequest {
78 pub fn id(&self) -> SegmentId {
79 self.id
80 }
81
82 pub fn resolve(self, buffer: VortexResult<ByteBuffer>) {
84 self.events.submit_event(SegmentEvent::Resolved(self.id));
85 if self.callback.send(buffer).is_err() {
86 log::debug!("Segment {} dropped while request in-flight", self.id);
89 }
90 }
91}
92
93impl SegmentEvents {
94 fn segment_future(
96 self: Arc<Self>,
97 id: SegmentId,
98 for_whom: Arc<str>,
99 ) -> Shared<SegmentEventsFuture> {
100 loop {
101 match self.pending.entry(id) {
104 Entry::Occupied(e) => {
105 if let Some(fut) = e.get().future() {
106 return fut;
107 } else {
108 log::debug!("Re-requesting dropped segment from segment reader {}", id);
109 e.remove();
110 }
111 }
112 Entry::Vacant(e) => {
113 let fut = SegmentEventsFuture::new(id, for_whom, self.clone()).shared();
114 e.insert(PendingSegment {
116 id,
117 fut: fut
118 .downgrade()
119 .vortex_expect("cannot fail, only just created"),
120 });
121 return fut;
122 }
123 }
124 }
125 }
126
127 fn submit_event(&self, event: SegmentEvent) {
129 if self.events.unbounded_send(event).is_err() {
130 log::trace!("Segment queue shutting down, no problem if we lose events")
131 }
132 }
133}
134
135struct EventsSegmentSource {
136 events: Arc<SegmentEvents>,
137}
138
139impl SegmentSource for EventsSegmentSource {
140 fn request(
141 &self,
142 id: SegmentId,
143 for_whom: &Arc<str>,
144 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
145 self.events
146 .clone()
147 .segment_future(id, for_whom.clone())
148 .map_err(VortexError::from)
149 .boxed()
150 }
151}
152
153pub struct PendingSegment {
155 id: SegmentId,
156 fut: WeakShared<SegmentEventsFuture>,
160}
161
162impl Debug for PendingSegment {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("PendingSegment")
165 .field("id", &self.id)
166 .finish()
167 }
168}
169
170impl PendingSegment {
171 fn future(&self) -> Option<Shared<SegmentEventsFuture>> {
173 self.fut.upgrade()
174 }
175}
176
177struct SegmentEventsFuture {
180 future: BoxFuture<'static, SharedVortexResult<ByteBuffer>>,
181 id: SegmentId,
182 source: Arc<SegmentEvents>,
183 polled: AtomicBool,
184}
185
186impl SegmentEventsFuture {
187 fn new(id: SegmentId, for_whom: Arc<str>, events: Arc<SegmentEvents>) -> Self {
188 let (send, recv) = oneshot::channel::<VortexResult<ByteBuffer>>();
189
190 let this = SegmentEventsFuture {
192 future: recv
193 .map_err(|e| vortex_err!("pending segment receiver dropped: {}", e))
194 .map(|r| r.unnest())
195 .map_err(Arc::new)
196 .boxed(),
197 id,
198 source: events.clone(),
199 polled: AtomicBool::new(false),
200 };
201
202 events.submit_event(SegmentEvent::Requested(SegmentRequest {
204 id,
205 for_whom,
206 callback: send,
207 events: events.clone(),
208 }));
209
210 this
211 }
212}
213
214impl Future for SegmentEventsFuture {
215 type Output = SharedVortexResult<ByteBuffer>;
216
217 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
218 if !self.polled.fetch_or(true, atomic::Ordering::Relaxed) {
219 self.source.submit_event(SegmentEvent::Polled(self.id));
220 }
221 self.future.poll_unpin(cx)
222 }
223}
224
225impl Drop for SegmentEventsFuture {
226 fn drop(&mut self) {
227 self.source.submit_event(SegmentEvent::Dropped(self.id));
228 }
229}