use crate::{
Cacheable, PolarsPlugin,
dataframe::values::{Column, NuDataFrame},
values::{CustomValueSupport, PolarsPluginObject, PolarsPluginType, cant_convert_err},
};
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::shell_error::generic::GenericError;
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Value,
};
#[derive(Clone)]
pub struct LazyCollect;
impl PluginCommand for LazyCollect {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars collect"
}
fn description(&self) -> &str {
"Collect lazy dataframe into eager dataframe."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.input_output_types(vec![
(
PolarsPluginType::NuDataFrame.into(),
PolarsPluginType::NuDataFrame.into(),
),
(
PolarsPluginType::NuLazyFrame.into(),
PolarsPluginType::NuDataFrame.into(),
),
])
.category(Category::Custom("lazyframe".into()))
}
fn examples(&self) -> Vec<Example<'_>> {
vec![Example {
description: "drop duplicates",
example: "[[a b]; [1 2] [3 4]] | polars into-lazy | polars collect",
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
],
None,
Span::test_data(),
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let value = input.into_value(call.head)?;
match PolarsPluginObject::try_from_value(plugin, &value)? {
PolarsPluginObject::NuLazyFrame(lazy) => {
let mut eager = lazy.collect(call.head)?;
eager.from_lazy = true;
Ok(PipelineData::value(
eager
.cache(plugin, engine, call.head)?
.into_value(call.head),
None,
))
}
PolarsPluginObject::NuDataFrame(df) => {
let cv = plugin.cache.get(&df.id, true)?.ok_or_else(|| {
ShellError::Generic(GenericError::new(
format!("Failed to get cached value {}", df.id),
"",
call.head,
))
})?;
let df = NuDataFrame::from_cache_value(cv.value.clone())?;
Ok(PipelineData::value(df.into_value(call.head), None))
}
_ => Err(cant_convert_err(
&value,
&[PolarsPluginType::NuLazyFrame, PolarsPluginType::NuDataFrame],
)),
}
.map_err(LabeledError::from)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::test_polars_plugin_command;
#[test]
fn test_examples() -> Result<(), nu_protocol::ShellError> {
test_polars_plugin_command(&LazyCollect)
}
}