1use std::fmt::{Debug, Formatter};
5use std::pin::Pin;
6use std::sync::atomic::AtomicBool;
7use std::sync::{Arc, atomic};
8use std::task::{Context, Poll};
9
10use dashmap::{DashMap, Entry};
11use futures::channel::{mpsc, oneshot};
12use futures::future::{BoxFuture, Shared, WeakShared};
13use futures::stream::BoxStream;
14use futures::{FutureExt, StreamExt, TryFutureExt};
15use vortex_buffer::ByteBuffer;
16use vortex_error::{
17 ResultExt, SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_err,
18};
19
20use crate::segments::{SegmentId, SegmentSource};
21
22pub struct SegmentEvents {
27 pending: DashMap<SegmentId, PendingSegment>,
28 events: mpsc::UnboundedSender<SegmentEvent>,
29}
30
31impl SegmentEvents {
32 pub fn create() -> (Arc<dyn SegmentSource>, BoxStream<'static, SegmentEvent>) {
33 let (send, recv) = mpsc::unbounded();
34
35 let events = Arc::new(Self {
36 pending: Default::default(),
37 events: send,
38 });
39
40 let source = Arc::new(EventsSegmentSource { events });
41 let stream = recv.boxed();
42
43 (source, stream)
44 }
45}
46
47pub enum SegmentEvent {
48 Requested(SegmentRequest),
49 Polled(SegmentId),
50 Dropped(SegmentId),
51 Resolved(SegmentId),
52}
53
54impl Debug for SegmentEvent {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 match self {
57 SegmentEvent::Requested(req) => write!(
58 f,
59 "SegmentEvent::Registered({:?}, {})",
60 req.id, req.for_whom
61 ),
62 SegmentEvent::Polled(id) => write!(f, "SegmentEvent::Polled({id:?})"),
63 SegmentEvent::Dropped(id) => write!(f, "SegmentEvent::Dropped({id:?})"),
64 SegmentEvent::Resolved(id) => write!(f, "SegmentEvent::Resolved({id:?})"),
65 }
66 }
67}
68
69pub struct SegmentRequest {
70 id: SegmentId,
72 for_whom: Arc<str>,
74 callback: oneshot::Sender<VortexResult<ByteBuffer>>,
76 events: Arc<SegmentEvents>,
78}
79
80impl SegmentRequest {
81 pub fn id(&self) -> SegmentId {
82 self.id
83 }
84
85 #[doc(hidden)]
88 pub fn for_whom(&self) -> &str {
89 &self.for_whom
90 }
91
92 pub fn resolve(self, buffer: VortexResult<ByteBuffer>) {
94 self.events.submit_event(SegmentEvent::Resolved(self.id));
95 if self.callback.send(buffer).is_err() {
96 log::debug!("Segment {} dropped while request in-flight", self.id);
99 }
100 }
101}
102
103impl SegmentEvents {
104 fn segment_future(
106 self: Arc<Self>,
107 id: SegmentId,
108 for_whom: Arc<str>,
109 ) -> Shared<SegmentEventsFuture> {
110 loop {
111 match self.pending.entry(id) {
114 Entry::Occupied(e) => {
115 if let Some(fut) = e.get().future() {
116 return fut;
117 } else {
118 log::debug!("Re-requesting dropped segment from segment reader {id}");
119 e.remove();
120 }
121 }
122 Entry::Vacant(e) => {
123 let fut = SegmentEventsFuture::new(id, for_whom, self.clone()).shared();
124 e.insert(PendingSegment {
126 id,
127 fut: fut
128 .downgrade()
129 .vortex_expect("cannot fail, only just created"),
130 });
131 return fut;
132 }
133 }
134 }
135 }
136
137 fn submit_event(&self, event: SegmentEvent) {
139 if self.events.unbounded_send(event).is_err() {
140 log::trace!("Segment queue shutting down, no problem if we lose events")
141 }
142 }
143}
144
145struct EventsSegmentSource {
146 events: Arc<SegmentEvents>,
147}
148
149impl SegmentSource for EventsSegmentSource {
150 fn request(
151 &self,
152 id: SegmentId,
153 for_whom: &Arc<str>,
154 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
155 self.events
156 .clone()
157 .segment_future(id, for_whom.clone())
158 .map_err(VortexError::from)
159 .boxed()
160 }
161}
162
163struct PendingSegment {
165 id: SegmentId,
166 fut: WeakShared<SegmentEventsFuture>,
170}
171
172impl Debug for PendingSegment {
173 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("PendingSegment")
175 .field("id", &self.id)
176 .finish()
177 }
178}
179
180impl PendingSegment {
181 fn future(&self) -> Option<Shared<SegmentEventsFuture>> {
183 self.fut.upgrade()
184 }
185}
186
187struct SegmentEventsFuture {
190 future: BoxFuture<'static, SharedVortexResult<ByteBuffer>>,
191 id: SegmentId,
192 source: Arc<SegmentEvents>,
193 polled: AtomicBool,
194}
195
196impl SegmentEventsFuture {
197 fn new(id: SegmentId, for_whom: Arc<str>, events: Arc<SegmentEvents>) -> Self {
198 let (send, recv) = oneshot::channel::<VortexResult<ByteBuffer>>();
199
200 let this = SegmentEventsFuture {
202 future: recv
203 .map_err(|e| vortex_err!("pending segment receiver dropped: {}", e))
204 .map(|r| r.unnest())
205 .map_err(Arc::new)
206 .boxed(),
207 id,
208 source: events.clone(),
209 polled: AtomicBool::new(false),
210 };
211
212 events.submit_event(SegmentEvent::Requested(SegmentRequest {
214 id,
215 for_whom,
216 callback: send,
217 events: events.clone(),
218 }));
219
220 this
221 }
222}
223
224impl Future for SegmentEventsFuture {
225 type Output = SharedVortexResult<ByteBuffer>;
226
227 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228 if !self.polled.fetch_or(true, atomic::Ordering::Relaxed) {
229 self.source.submit_event(SegmentEvent::Polled(self.id));
230 }
231 self.future.poll_unpin(cx)
232 }
233}
234
235impl Drop for SegmentEventsFuture {
236 fn drop(&mut self) {
237 self.source.submit_event(SegmentEvent::Dropped(self.id));
238 }
239}