Skip to main content

polars_ooc/
spill_frame.rs

1use std::fmt::Debug;
2use std::ops::{Deref, DerefMut};
3
4use polars_core::frame::DataFrame;
5
6use crate::spill_context::ParameterFreeSpillContext;
7use crate::{PinnedMut, PinnedRef, SpillToken, Spillable, memory_manager};
8
9impl Spillable for DataFrame {
10    // TODO: just a dummy spill for now. Boxed to reduce size.
11    type Spilled = Box<DataFrame>;
12
13    fn estimate_byte_size(&self) -> usize {
14        self.estimated_size()
15    }
16
17    async fn spill(&self) -> Self::Spilled {
18        Box::new(self.clone())
19    }
20
21    async fn unspill(location: &Self::Spilled) -> Self {
22        (**location).clone()
23    }
24}
25
26pub struct SpillFrame {
27    token: SpillToken<DataFrame>,
28    height: usize,
29}
30
31impl AsRef<SpillToken<DataFrame>> for SpillFrame {
32    fn as_ref(&self) -> &SpillToken<DataFrame> {
33        &self.token
34    }
35}
36
37impl SpillFrame {
38    pub fn new_unregistered(df: DataFrame) -> Self {
39        let height = df.height();
40        let token = SpillToken::new(df);
41        Self { token, height }
42    }
43
44    pub async fn new<C: ParameterFreeSpillContext>(df: DataFrame, ctx: &C) -> Self {
45        let slf = Self::new_unregistered(df);
46        ctx.register(&slf);
47        memory_manager().spill().await;
48        slf
49    }
50
51    pub fn new_blocking<C: ParameterFreeSpillContext>(df: DataFrame, ctx: &C) -> Self {
52        let slf = Self::new_unregistered(df);
53        ctx.register(&slf);
54        memory_manager().spill_blocking();
55        slf
56    }
57
58    /// The height of the contained DataFrame. Does not need to unspill DataFrame.
59    pub fn height(&self) -> usize {
60        self.height
61    }
62
63    /// Get a reference to the underlying DataFrame, returning None if it was spilled.
64    pub fn try_get(&self) -> Option<PinnedRef<'_, DataFrame>> {
65        self.token.try_get()
66    }
67
68    /// Get a reference to the underlying DataFrame, unspilling it if it
69    /// was spilled.
70    pub async fn get(&self) -> PinnedRef<'_, DataFrame> {
71        self.token.get().await
72    }
73
74    /// Blocking version of get.
75    pub fn get_blocking(&self) -> PinnedRef<'_, DataFrame> {
76        self.token.get_blocking()
77    }
78
79    /// Get a mutable reference to the underlying DataFrame, unspilling it if it
80    /// was spilled.
81    pub async fn get_mut(&mut self) -> PinnedFrameMut<'_> {
82        PinnedFrameMut {
83            inner: self.token.get_mut().await,
84            height: &mut self.height,
85        }
86    }
87
88    /// Blocking version of get_mut.
89    pub fn get_mut_blocking(&mut self) -> PinnedFrameMut<'_> {
90        PinnedFrameMut {
91            inner: self.token.get_mut_blocking(),
92            height: &mut self.height,
93        }
94    }
95
96    /// Consumes this SpillFrame, unspilling it if it were spilled.
97    pub async fn into_df(self) -> DataFrame {
98        self.token.into_inner().await
99    }
100
101    /// Blocking version of into_df.
102    pub fn into_df_blocking(self) -> DataFrame {
103        self.token.into_inner_blocking()
104    }
105}
106
107impl Debug for SpillFrame {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        let mut s = f.debug_struct("SpillFrame");
110        match self.token.try_get() {
111            Some(df) => s.field("df", &*df),
112            None => s.field("df", &"spilled"),
113        };
114        s.finish()
115    }
116}
117
118pub struct PinnedFrameMut<'a> {
119    height: &'a mut usize,
120    inner: PinnedMut<'a, DataFrame>,
121}
122
123impl<'a> Deref for PinnedFrameMut<'a> {
124    type Target = DataFrame;
125
126    fn deref(&self) -> &Self::Target {
127        &self.inner
128    }
129}
130
131impl<'a> DerefMut for PinnedFrameMut<'a> {
132    fn deref_mut(&mut self) -> &mut Self::Target {
133        &mut self.inner
134    }
135}
136
137impl<'a> Drop for PinnedFrameMut<'a> {
138    fn drop(&mut self) {
139        *self.height = self.inner.height();
140    }
141}