polars_ooc/
spill_frame.rs1use std::fmt::Debug;
2use std::ops::{Deref, DerefMut};
3
4use polars_core::frame::DataFrame;
5
6use crate::spill_context::ParameterFreeSpillContext;
7use crate::{PinnedMut, PinnedRef, SpillToken, Spillable, memory_manager};
8
9impl Spillable for DataFrame {
10 type Spilled = Box<DataFrame>;
12
13 fn estimate_byte_size(&self) -> usize {
14 self.estimated_size()
15 }
16
17 async fn spill(&self) -> Self::Spilled {
18 Box::new(self.clone())
19 }
20
21 async fn unspill(location: &Self::Spilled) -> Self {
22 (**location).clone()
23 }
24}
25
26pub struct SpillFrame {
27 token: SpillToken<DataFrame>,
28 height: usize,
29}
30
31impl AsRef<SpillToken<DataFrame>> for SpillFrame {
32 fn as_ref(&self) -> &SpillToken<DataFrame> {
33 &self.token
34 }
35}
36
37impl SpillFrame {
38 pub fn new_unregistered(df: DataFrame) -> Self {
39 let height = df.height();
40 let token = SpillToken::new(df);
41 Self { token, height }
42 }
43
44 pub async fn new<C: ParameterFreeSpillContext>(df: DataFrame, ctx: &C) -> Self {
45 let slf = Self::new_unregistered(df);
46 ctx.register(&slf);
47 memory_manager().spill().await;
48 slf
49 }
50
51 pub fn new_blocking<C: ParameterFreeSpillContext>(df: DataFrame, ctx: &C) -> Self {
52 let slf = Self::new_unregistered(df);
53 ctx.register(&slf);
54 memory_manager().spill_blocking();
55 slf
56 }
57
58 pub fn height(&self) -> usize {
60 self.height
61 }
62
63 pub fn try_get(&self) -> Option<PinnedRef<'_, DataFrame>> {
65 self.token.try_get()
66 }
67
68 pub async fn get(&self) -> PinnedRef<'_, DataFrame> {
71 self.token.get().await
72 }
73
74 pub fn get_blocking(&self) -> PinnedRef<'_, DataFrame> {
76 self.token.get_blocking()
77 }
78
79 pub async fn get_mut(&mut self) -> PinnedFrameMut<'_> {
82 PinnedFrameMut {
83 inner: self.token.get_mut().await,
84 height: &mut self.height,
85 }
86 }
87
88 pub fn get_mut_blocking(&mut self) -> PinnedFrameMut<'_> {
90 PinnedFrameMut {
91 inner: self.token.get_mut_blocking(),
92 height: &mut self.height,
93 }
94 }
95
96 pub async fn into_df(self) -> DataFrame {
98 self.token.into_inner().await
99 }
100
101 pub fn into_df_blocking(self) -> DataFrame {
103 self.token.into_inner_blocking()
104 }
105}
106
107impl Debug for SpillFrame {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 let mut s = f.debug_struct("SpillFrame");
110 match self.token.try_get() {
111 Some(df) => s.field("df", &*df),
112 None => s.field("df", &"spilled"),
113 };
114 s.finish()
115 }
116}
117
118pub struct PinnedFrameMut<'a> {
119 height: &'a mut usize,
120 inner: PinnedMut<'a, DataFrame>,
121}
122
123impl<'a> Deref for PinnedFrameMut<'a> {
124 type Target = DataFrame;
125
126 fn deref(&self) -> &Self::Target {
127 &self.inner
128 }
129}
130
131impl<'a> DerefMut for PinnedFrameMut<'a> {
132 fn deref_mut(&mut self) -> &mut Self::Target {
133 &mut self.inner
134 }
135}
136
137impl<'a> Drop for PinnedFrameMut<'a> {
138 fn drop(&mut self) {
139 *self.height = self.inner.height();
140 }
141}