mod custom_value;
use crate::{Cacheable, PolarsPlugin};
use super::{
CustomValueSupport, NuDataFrame, NuExpression, NuSchema, PolarsPluginObject, PolarsPluginType,
cant_convert_err,
};
use core::fmt;
use nu_protocol::shell_error::generic::GenericError;
use nu_protocol::{PipelineData, ShellError, Span, Value, record};
use polars::prelude::{Expr, IntoLazy, LazyFrame};
use std::sync::Arc;
use uuid::Uuid;
pub use custom_value::NuLazyFrameCustomValue;
#[derive(Default, Clone)]
pub struct NuLazyFrame {
pub id: Uuid,
pub lazy: Arc<LazyFrame>,
pub from_eager: bool,
}
impl fmt::Debug for NuLazyFrame {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "NuLazyframe")
}
}
impl From<LazyFrame> for NuLazyFrame {
fn from(lazy_frame: LazyFrame) -> Self {
NuLazyFrame::new(false, lazy_frame)
}
}
impl NuLazyFrame {
pub fn new(from_eager: bool, lazy: LazyFrame) -> Self {
Self {
id: Uuid::new_v4(),
lazy: Arc::new(lazy),
from_eager,
}
}
pub fn from_dataframe(df: NuDataFrame) -> Self {
let lazy = df.as_ref().clone().lazy();
NuLazyFrame::new(true, lazy)
}
pub fn to_polars(&self) -> LazyFrame {
(*self.lazy).clone()
}
pub fn collect(self, span: Span) -> Result<NuDataFrame, ShellError> {
crate::handle_panic(
|| {
self.to_polars()
.collect()
.map_err(|e| {
ShellError::Generic(GenericError::new(
"Error collecting lazy frame",
e.to_string(),
span,
))
})
.map(|df| NuDataFrame::new(true, df))
},
span,
)
}
pub fn apply_with_expr<F>(self, expr: NuExpression, f: F) -> Self
where
F: Fn(LazyFrame, Expr) -> LazyFrame,
{
let df = self.to_polars();
let expr = expr.into_polars();
let new_frame = f(df, expr);
Self::new(self.from_eager, new_frame)
}
pub fn schema(&mut self) -> Result<NuSchema, ShellError> {
let internal_schema = Arc::make_mut(&mut self.lazy)
.collect_schema()
.map_err(|e| {
ShellError::Generic(GenericError::new_internal(
"Error getting schema from lazy frame",
e.to_string(),
))
})?;
Ok(internal_schema.into())
}
pub fn try_from_value_coerce(
plugin: &PolarsPlugin,
value: &Value,
) -> Result<NuLazyFrame, ShellError> {
match PolarsPluginObject::try_from_value(plugin, value)? {
PolarsPluginObject::NuDataFrame(df) => Ok(df.lazy()),
PolarsPluginObject::NuLazyFrame(lazy) => Ok(lazy),
_ => Err(cant_convert_err(
value,
&[PolarsPluginType::NuDataFrame, PolarsPluginType::NuLazyFrame],
)),
}
}
pub fn try_from_pipeline_coerce(
plugin: &PolarsPlugin,
input: PipelineData,
span: Span,
) -> Result<Self, ShellError> {
let value = input.into_value(span)?;
Self::try_from_value_coerce(plugin, &value)
}
}
impl Cacheable for NuLazyFrame {
fn cache_id(&self) -> &Uuid {
&self.id
}
fn to_cache_value(&self) -> Result<PolarsPluginObject, ShellError> {
Ok(PolarsPluginObject::NuLazyFrame(self.clone()))
}
fn from_cache_value(cv: PolarsPluginObject) -> Result<Self, ShellError> {
match cv {
PolarsPluginObject::NuLazyFrame(df) => Ok(df),
_ => Err(ShellError::Generic(GenericError::new_internal(
"Cache value is not a lazyframe",
"",
))),
}
}
}
impl CustomValueSupport for NuLazyFrame {
type CV = NuLazyFrameCustomValue;
fn custom_value(self) -> Self::CV {
NuLazyFrameCustomValue {
id: self.id,
lazyframe: Some(self),
}
}
fn get_type_static() -> PolarsPluginType {
PolarsPluginType::NuLazyFrame
}
fn base_value(self, span: Span) -> Result<Value, ShellError> {
let optimized_plan = self
.lazy
.describe_optimized_plan()
.unwrap_or_else(|_| "<NOT AVAILABLE>".to_string());
Ok(Value::record(
record! {
"plan" => Value::string(
self.lazy.describe_plan().map_err(|e| {
ShellError::Generic(GenericError::new(
"Error getting plan",
e.to_string(),
span,
))
})?,
span),
"optimized_plan" => Value::string(optimized_plan, span),
},
span,
))
}
}