use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, Signature, SyntaxShape, Type, Value,
};
use crate::{UlidEngine, UlidPlugin};
const DEFAULT_BATCH_SIZE: usize = 1_000;
const MAX_STREAM_COUNT: usize = 100_000;
pub struct UlidStreamCommand;
impl PluginCommand for UlidStreamCommand {
type Plugin = UlidPlugin;
fn name(&self) -> &str {
"ulid stream"
}
fn description(&self) -> &str {
"Stream-process large datasets of ULIDs with memory-efficient operations"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"operation",
SyntaxShape::String,
"Operation to perform: validate, parse, extract-timestamp, or transform",
)
.named(
"batch-size",
SyntaxShape::Int,
"Number of items to process in each batch (default: 1000)",
Some('b'),
)
.named(
"output-format",
SyntaxShape::String,
"Output format for parsed data: compact, full, timestamp-only",
Some('f'),
)
.switch(
"continue-on-error",
"Continue processing despite individual item errors",
Some('c'),
)
.input_output_types(vec![
(
Type::List(Box::new(Type::String)),
Type::List(Box::new(Type::Any)),
),
(
Type::List(Box::new(Type::Record(vec![].into()))),
Type::List(Box::new(Type::Any)),
),
])
.category(Category::Filters)
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: r#"["01AN4Z07BY79KA1307SR9X4MV3", "01AN4Z07BZ79KA1307SR9X4MV4"] | ulid stream validate"#,
description: "Stream-validate a list of ULIDs",
result: None,
},
Example {
example: r#"$large_ulid_list | ulid stream parse --batch-size 500"#,
description: "Parse large ULID list in batches of 500",
result: None,
},
Example {
example: r#"$ulid_data | ulid stream transform --output-format compact --continue-on-error"#,
description: "Transform ULIDs to compact format, continuing on errors",
result: None,
},
]
}
fn run(
&self,
_plugin: &Self::Plugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let operation: String = call.req(0)?;
let batch_size: Option<i64> = call.get_flag("batch-size")?;
let output_format: Option<String> = call.get_flag("output-format")?;
let continue_on_error: bool = call.has_flag("continue-on-error")?;
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE as i64) as usize;
let format = output_format.unwrap_or_else(|| "full".to_string());
match input {
PipelineData::Value(
Value::List {
vals,
internal_span,
..
},
_,
) => {
let result = process_stream(
&vals,
&operation,
batch_size,
&format,
continue_on_error,
call.head,
)
.map_err(|e| *e)?;
Ok(PipelineData::Value(
Value::list(result, internal_span),
None,
))
}
PipelineData::Empty => Ok(PipelineData::Empty),
_ => Err(LabeledError::new("Invalid input").with_label(
"Expected a list of ULIDs or ULID-containing records",
call.head,
)),
}
}
}
fn process_stream(
input_vals: &[Value],
operation: &str,
batch_size: usize,
output_format: &str,
continue_on_error: bool,
call_head: nu_protocol::Span,
) -> Result<Vec<Value>, Box<LabeledError>> {
if input_vals.is_empty() {
return Ok(Vec::new());
}
let mut results = Vec::new();
let total_batches = input_vals.len().div_ceil(batch_size);
for (batch_idx, chunk) in input_vals.chunks(batch_size).enumerate() {
if total_batches > 10 && batch_idx % (total_batches / 10).max(1) == 0 {
eprintln!(
"Processing batch {}/{} ({:.1}%)",
batch_idx + 1,
total_batches,
(batch_idx as f64 / total_batches as f64) * 100.0
);
}
let batch_results = process_batch_sequential(
chunk,
operation,
output_format,
continue_on_error,
call_head,
)?;
results.extend(batch_results);
}
Ok(results)
}
fn process_batch_sequential(
batch: &[Value],
operation: &str,
output_format: &str,
continue_on_error: bool,
call_head: nu_protocol::Span,
) -> Result<Vec<Value>, Box<LabeledError>> {
let mut results = Vec::new();
for value in batch {
match process_single_item(value, operation, output_format, call_head) {
Ok(result) => results.push(result),
Err(e) => {
if continue_on_error {
let mut error_record = nu_protocol::Record::new();
error_record.push("error", Value::string(e.msg, call_head));
error_record.push("input", value.clone());
results.push(Value::record(error_record, call_head));
} else {
return Err(e);
}
}
}
}
Ok(results)
}
fn process_single_item(
value: &Value,
operation: &str,
output_format: &str,
call_head: nu_protocol::Span,
) -> Result<Value, Box<LabeledError>> {
let ulid_str = extract_ulid_string(value)?;
match operation {
"validate" => {
let is_valid = UlidEngine::validate(&ulid_str);
Ok(Value::bool(is_valid, call_head))
}
"parse" => {
let components = UlidEngine::parse(&ulid_str).map_err(|e| {
Box::new(LabeledError::new("Parse failed").with_label(e.to_string(), call_head))
})?;
match output_format {
"compact" => {
let mut record = nu_protocol::Record::new();
record.push("ulid", Value::string(&components.ulid, call_head));
record.push("timestamp_ms", Value::int(components.timestamp_ms as i64, call_head));
record.push("randomness", Value::string(&components.randomness_hex, call_head));
Ok(Value::record(record, call_head))
}
"timestamp-only" => Ok(Value::int(components.timestamp_ms as i64, call_head)),
_ => Ok(UlidEngine::components_to_value(&components, call_head)),
}
}
"extract-timestamp" => {
let timestamp = UlidEngine::extract_timestamp(&ulid_str).map_err(|e| {
Box::new(LabeledError::new("Timestamp extraction failed").with_label(e.to_string(), call_head))
})?;
Ok(Value::int(timestamp as i64, call_head))
}
"transform" => {
if !UlidEngine::validate(&ulid_str) {
return Err(Box::new(LabeledError::new("Invalid ULID")
.with_label(format!("'{}' is not a valid ULID", ulid_str), call_head)));
}
match output_format {
"compact" => {
let mut record = nu_protocol::Record::new();
record.push("ulid", Value::string(&ulid_str, call_head));
Ok(Value::record(record, call_head))
}
_ => Ok(Value::string(&ulid_str, call_head)),
}
}
_ => Err(Box::new(LabeledError::new("Invalid operation").with_label(
format!(
"Unknown operation '{}'. Valid operations: validate, parse, extract-timestamp, transform",
operation
),
call_head,
))),
}
}
fn extract_ulid_string(value: &Value) -> Result<String, Box<LabeledError>> {
match value {
Value::String { val, .. } => Ok(val.clone()),
Value::Record { val, .. } => {
for field_name in ["ulid", "id", "identifier", "uuid"] {
if let Some(Value::String { val, .. }) = val.get(field_name) {
return Ok(val.clone());
}
}
Err(Box::new(
LabeledError::new("No ULID field found").with_label(
"Record must contain a ULID in 'ulid', 'id', 'identifier', or 'uuid' field",
nu_protocol::Span::unknown(),
),
))
}
_ => Err(Box::new(
LabeledError::new("Invalid value type").with_label(
"Expected string or record containing ULID",
nu_protocol::Span::unknown(),
),
)),
}
}
pub struct UlidGenerateStreamCommand;
impl PluginCommand for UlidGenerateStreamCommand {
type Plugin = UlidPlugin;
fn name(&self) -> &str {
"ulid generate-stream"
}
fn description(&self) -> &str {
"Generate a continuous stream of ULIDs with memory-efficient batch processing"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"count",
SyntaxShape::Int,
"Total number of ULIDs to generate",
)
.named(
"batch-size",
SyntaxShape::Int,
"Number of ULIDs to generate per batch (default: 1000)",
Some('b'),
)
.named(
"timestamp",
SyntaxShape::Int,
"Base timestamp in milliseconds (incremented for each ULID)",
Some('t'),
)
.switch(
"unique-timestamps",
"Ensure each ULID has a unique timestamp",
Some('u'),
)
.input_output_types(vec![(Type::Nothing, Type::List(Box::new(Type::String)))])
.category(Category::Generators)
}
fn examples(&self) -> Vec<Example<'_>> {
vec![
Example {
example: "ulid generate-stream 10000",
description: "Generate 10,000 ULIDs in memory-efficient batches",
result: None,
},
Example {
example: "ulid generate-stream 50000 --batch-size 500",
description: "Generate 50,000 ULIDs in batches of 500",
result: None,
},
Example {
example: "ulid generate-stream 1000 --unique-timestamps",
description: "Generate 1,000 ULIDs with guaranteed unique timestamps",
result: None,
},
]
}
fn run(
&self,
_plugin: &Self::Plugin,
_engine: &EngineInterface,
call: &EvaluatedCall,
_input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let count: i64 = call.req(0)?;
let batch_size: Option<i64> = call.get_flag("batch-size")?;
let base_timestamp: Option<i64> = call.get_flag("timestamp")?;
let unique_timestamps: bool = call.has_flag("unique-timestamps")?;
if count < 0 {
return Err(
LabeledError::new("Invalid count").with_label("Count must be positive", call.head)
);
}
if count > MAX_STREAM_COUNT as i64 {
return Err(LabeledError::new("Count too large").with_label(
"Maximum count is 100,000 for streaming generation",
call.head,
));
}
let count = count as usize;
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE as i64).max(1) as usize;
let mut results = Vec::new();
let total_batches = count.div_ceil(batch_size);
let mut current_timestamp = base_timestamp.map(|t| t as u64);
for batch_idx in 0..total_batches {
let remaining = count - batch_idx * batch_size;
let current_batch_size = remaining.min(batch_size);
if total_batches > 10 && batch_idx % (total_batches / 10).max(1) == 0 {
eprintln!(
"Generating batch {}/{} ({:.1}%)",
batch_idx + 1,
total_batches,
(batch_idx as f64 / total_batches as f64) * 100.0
);
}
let batch_results = if let Some(ref mut timestamp) = current_timestamp {
generate_batch_with_timestamps(
current_batch_size,
timestamp,
unique_timestamps,
call.head,
)
.map_err(|e| *e)?
} else {
generate_batch_random(current_batch_size, call.head).map_err(|e| *e)?
};
results.extend(batch_results);
}
Ok(PipelineData::Value(Value::list(results, call.head), None))
}
}
fn generate_batch_with_timestamps(
count: usize,
base_timestamp: &mut u64,
unique_timestamps: bool,
call_head: nu_protocol::Span,
) -> Result<Vec<Value>, Box<LabeledError>> {
let mut results = Vec::with_capacity(count);
for _ in 0..count {
let ulid = UlidEngine::generate_with_timestamp(*base_timestamp).map_err(|e| {
Box::new(LabeledError::new("Generation failed").with_label(e.to_string(), call_head))
})?;
results.push(Value::string(ulid.to_string(), call_head));
if unique_timestamps {
*base_timestamp += 1;
}
}
Ok(results)
}
fn generate_batch_random(
count: usize,
call_head: nu_protocol::Span,
) -> Result<Vec<Value>, Box<LabeledError>> {
let ulids = UlidEngine::generate_bulk(count).map_err(|e| {
Box::new(LabeledError::new("Bulk generation failed").with_label(e.to_string(), call_head))
})?;
Ok(ulids
.into_iter()
.map(|ulid| Value::string(ulid.to_string(), call_head))
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
use nu_protocol::Span;
fn test_span() -> Span {
Span::test_data()
}
const TEST_ULID: &str = "01AN4Z07BY79KA1307SR9X4MV3";
fn make_ulid_values(ulids: &[&str]) -> Vec<Value> {
ulids
.iter()
.map(|s| Value::string(*s, test_span()))
.collect()
}
mod stream_command {
use super::*;
#[test]
fn test_command_name() {
assert_eq!(UlidStreamCommand.name(), "ulid stream");
}
#[test]
fn test_command_signature() {
let sig = UlidStreamCommand.signature();
assert_eq!(sig.name, "ulid stream");
assert!(sig.named.iter().any(|f| f.long == "batch-size"));
assert!(sig.named.iter().any(|f| f.long == "output-format"));
assert!(sig.named.iter().any(|f| f.long == "continue-on-error"));
assert!(!sig.named.iter().any(|f| f.long == "parallel"));
}
#[test]
fn test_command_examples_not_empty() {
assert!(!UlidStreamCommand.examples().is_empty());
}
}
mod process_stream_tests {
use super::*;
#[test]
fn test_empty_input_returns_empty() {
let result = process_stream(&[], "validate", 100, "full", false, test_span());
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn test_validate_operation() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "validate", 100, "full", false, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].as_bool().unwrap());
}
#[test]
fn test_validate_invalid_ulid() {
let vals = make_ulid_values(&["not-a-ulid"]);
let result = process_stream(&vals, "validate", 100, "full", false, test_span());
let results = result.unwrap();
assert!(!results[0].as_bool().unwrap());
}
#[test]
fn test_extract_timestamp_operation() {
let vals = make_ulid_values(&[TEST_ULID]);
let result =
process_stream(&vals, "extract-timestamp", 100, "full", false, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].as_int().is_ok());
}
#[test]
fn test_parse_operation_full_format() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "parse", 100, "full", false, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].as_record().is_ok());
}
#[test]
fn test_parse_operation_compact_format() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "parse", 100, "compact", false, test_span());
let results = result.unwrap();
let record = results[0].as_record().unwrap();
assert!(record.get("ulid").is_some());
assert!(record.get("timestamp_ms").is_some());
assert!(record.get("randomness").is_some());
}
#[test]
fn test_parse_operation_timestamp_only_format() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "parse", 100, "timestamp-only", false, test_span());
let results = result.unwrap();
assert!(results[0].as_int().is_ok());
}
#[test]
fn test_transform_operation() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "transform", 100, "full", false, test_span());
let results = result.unwrap();
assert_eq!(results[0].as_str().unwrap(), TEST_ULID);
}
#[test]
fn test_transform_compact_format() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "transform", 100, "compact", false, test_span());
let results = result.unwrap();
let record = results[0].as_record().unwrap();
assert_eq!(record.get("ulid").unwrap().as_str().unwrap(), TEST_ULID);
}
#[test]
fn test_invalid_operation_returns_error() {
let vals = make_ulid_values(&[TEST_ULID]);
let result = process_stream(&vals, "bogus", 100, "full", false, test_span());
assert!(result.is_err());
}
#[test]
fn test_multiple_items() {
let vals = make_ulid_values(&[TEST_ULID, TEST_ULID]);
let result = process_stream(&vals, "validate", 100, "full", false, test_span());
assert_eq!(result.unwrap().len(), 2);
}
#[test]
fn test_batching_produces_correct_count() {
let vals = make_ulid_values(&[TEST_ULID, TEST_ULID, TEST_ULID]);
let result = process_stream(&vals, "validate", 2, "full", false, test_span());
assert_eq!(result.unwrap().len(), 3);
}
#[test]
fn test_continue_on_error() {
let vals = make_ulid_values(&["bad-ulid", TEST_ULID]);
let result = process_stream(&vals, "extract-timestamp", 100, "full", true, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 2);
let error_record = results[0].as_record().unwrap();
assert!(error_record.get("error").is_some());
assert!(error_record.get("input").is_some());
assert!(results[1].as_int().is_ok());
}
#[test]
fn test_error_without_continue_on_error() {
let vals = make_ulid_values(&["bad-ulid"]);
let result =
process_stream(&vals, "extract-timestamp", 100, "full", false, test_span());
assert!(result.is_err());
}
}
mod extract_ulid_string_tests {
use super::*;
#[test]
fn test_from_string_value() {
let val = Value::string(TEST_ULID, test_span());
assert_eq!(extract_ulid_string(&val).unwrap(), TEST_ULID);
}
#[test]
fn test_from_record_with_ulid_field() {
let mut record = nu_protocol::Record::new();
record.push("ulid", Value::string(TEST_ULID, test_span()));
let val = Value::record(record, test_span());
assert_eq!(extract_ulid_string(&val).unwrap(), TEST_ULID);
}
#[test]
fn test_from_record_with_id_field() {
let mut record = nu_protocol::Record::new();
record.push("id", Value::string(TEST_ULID, test_span()));
let val = Value::record(record, test_span());
assert_eq!(extract_ulid_string(&val).unwrap(), TEST_ULID);
}
#[test]
fn test_from_record_missing_field() {
let mut record = nu_protocol::Record::new();
record.push("other", Value::string("value", test_span()));
let val = Value::record(record, test_span());
assert!(extract_ulid_string(&val).is_err());
}
#[test]
fn test_from_invalid_type() {
let val = Value::int(42, test_span());
assert!(extract_ulid_string(&val).is_err());
}
}
mod generate_stream_command {
use super::*;
#[test]
fn test_command_name() {
assert_eq!(UlidGenerateStreamCommand.name(), "ulid generate-stream");
}
#[test]
fn test_command_signature() {
let sig = UlidGenerateStreamCommand.signature();
assert_eq!(sig.name, "ulid generate-stream");
assert!(sig.named.iter().any(|f| f.long == "batch-size"));
assert!(sig.named.iter().any(|f| f.long == "timestamp"));
assert!(sig.named.iter().any(|f| f.long == "unique-timestamps"));
}
#[test]
fn test_command_examples_not_empty() {
assert!(!UlidGenerateStreamCommand.examples().is_empty());
}
}
mod generate_batch_tests {
use super::*;
#[test]
fn test_generate_batch_random() {
let result = generate_batch_random(5, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 5);
for val in &results {
assert!(val.as_str().is_ok());
}
}
#[test]
fn test_generate_batch_with_timestamps() {
let mut timestamp = 1_000_000u64;
let result = generate_batch_with_timestamps(3, &mut timestamp, false, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(timestamp, 1_000_000);
}
#[test]
fn test_generate_batch_with_unique_timestamps() {
let mut timestamp = 1_000_000u64;
let result = generate_batch_with_timestamps(3, &mut timestamp, true, test_span());
let results = result.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(timestamp, 1_000_003);
}
}
}