use crate::nodes;
use crate::type_aggregation;
use std::str::FromStr;
use regex::Regex;
grammar;
pub Program: String = {
Expr* => {
let mut output = "async {
use ::marigold::marigold_impl::*;
".to_string();
// Before we can start streaming, we need to declare the helpers:
// the enums and structs, functions, and then the stream variable declarations.
let enums_and_structs = <>
.iter()
.filter_map(|expr| match expr {
crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::NamedReturningStream(_) => None,
crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::StructDeclaration(s) => Some(s.code()),
crate::nodes::TypedExpression::EnumDeclaration(e) => Some(e.code()),
crate::nodes::TypedExpression::StreamVariable(v) => None,
crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => None,
crate::nodes::TypedExpression::FnDeclaration(f) => None,
})
.map(|s| format!("{s}\n\n"))
.collect::<Vec<_>>()
.join("");
output.push_str(&enums_and_structs);
let functions = <>
.iter()
.filter_map(|expr| match expr {
crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::NamedReturningStream(_) => None,
crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::StructDeclaration(_) => None,
crate::nodes::TypedExpression::EnumDeclaration(_) => None,
crate::nodes::TypedExpression::StreamVariable(_) => None,
crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None,
crate::nodes::TypedExpression::FnDeclaration(f) => Some(f.code()),
})
.map(|s| format!("{s}\n\n"))
.collect::<Vec<_>>()
.join("");
output.push_str(&functions);
let stream_variable_declarations = <>
.iter()
.filter_map(|expr| match expr {
crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::NamedReturningStream(_) => None,
crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::StructDeclaration(s) => None,
crate::nodes::TypedExpression::EnumDeclaration(e) => None,
crate::nodes::TypedExpression::StreamVariable(v) => Some(v.declaration_code()),
crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => Some(v.declaration_code()),
crate::nodes::TypedExpression::FnDeclaration(f) => None
})
.map(|s| format!("{s}\n\n"))
.collect::<Vec<_>>()
.join("");
output.push_str(&stream_variable_declarations);
let returning_stream_vec = <>
.iter()
.filter_map(
|expr| match expr {
crate::nodes::TypedExpression::UnnamedReturningStream(s) => Some(s.code()),
crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::NamedReturningStream(s) => Some(s.code()),
crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::StructDeclaration(_) => None,
crate::nodes::TypedExpression::EnumDeclaration(_) => None,
crate::nodes::TypedExpression::StreamVariable(_) => None,
crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None,
crate::nodes::TypedExpression::FnDeclaration(f) => None,
})
.collect::<Vec<_>>();
let n_returning_streams = returning_stream_vec.len();
output.push_str(
returning_stream_vec
.iter()
.zip(0..n_returning_streams)
.map(|(stream_def, i)| format!("let returning_stream_{i} = Box::pin({stream_def});\n"))
.collect::<Vec<_>>()
.join("")
.as_str()
);
let non_returning_streams = <>
.iter()
.filter_map(
|expr| match expr {
crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
crate::nodes::TypedExpression::UnnamedNonReturningStream(s) => Some(s.code()),
crate::nodes::TypedExpression::NamedReturningStream(_) => None,
crate::nodes::TypedExpression::NamedNonReturningStream(s) => Some(s.code()),
crate::nodes::TypedExpression::StructDeclaration(_) => None,
crate::nodes::TypedExpression::EnumDeclaration(_) => None,
crate::nodes::TypedExpression::StreamVariable(_) => None,
crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None,
crate::nodes::TypedExpression::FnDeclaration(f) => None,
})
.collect::<Vec<_>>();
output.push_str(
non_returning_streams
.iter()
.zip(0..non_returning_streams.len())
.map(|(stream_def, i)| format!("let non_returning_stream_{i} = Box::pin({stream_def});\n"))
.collect::<Vec<_>>()
.join("")
.as_str()
);
let stream_variable_runners = <>
.iter()
.filter_map(
|expr| match expr {
crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::NamedReturningStream(_) => None,
crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
crate::nodes::TypedExpression::StructDeclaration(_) => None,
crate::nodes::TypedExpression::EnumDeclaration(_) => None,
crate::nodes::TypedExpression::StreamVariable(v) => Some(v.runner_code()),
crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => Some(v.runner_code()),
crate::nodes::TypedExpression::FnDeclaration(f) => None,
})
.collect::<Vec<_>>();
output.push_str(
stream_variable_runners
.iter()
.zip(0..stream_variable_runners.len())
.map(|(stream_def, i)| format!("let stream_variable_runners_{i} = Box::pin({stream_def});\n"))
.collect::<Vec<_>>()
.join("")
.as_str()
);
let mut streams_string = "vec![\n".to_string();
streams_string.push_str(
(0..n_returning_streams)
.map(|i| format!("returning_stream_{i},\n"))
.collect::<Vec<_>>()
.join("")
.as_str()
);
streams_string.push_str(
(0..non_returning_streams.len())
.map(|i| format!("non_returning_stream_{i},\n"))
.collect::<Vec<_>>()
.join("")
.as_str()
);
streams_string.push_str(
(0..stream_variable_runners.len())
.map(|i| format!("stream_variable_runners_{i},\n"))
.collect::<Vec<_>>()
.join("")
.as_str()
);
streams_string.push_str("]\n");
if n_returning_streams > 0 {
output.push_str(
format!("
/// silly function that uses generics to infer the output type (StreamItem) via generics, so that
/// we can provide the streams as an array of Pin<Box<dyn Stream<Item=StreamItem>>>.
#[inline(always)]
fn typed_stream_vec<StreamItem>(v: Vec<core::pin::Pin<Box<dyn futures::Stream<Item=StreamItem>>>>) -> Vec<core::pin::Pin<Box<dyn futures::Stream<Item=StreamItem>>>> {{
v
}}
").as_str()
);
output.push_str(format!("let streams_array = typed_stream_vec({streams_string});").as_str());
} else {
output.push_str(format!("let streams_array: Vec<core::pin::Pin<Box<dyn futures::Stream<Item=()>>>> = {streams_string};").as_str());
}
output.push_str("let mut all_streams = ::marigold::marigold_impl::futures::stream::select_all(streams_array);");
if n_returning_streams == 0 {
output.push_str("all_streams.collect::<Vec<()>>().await;\n");
// ^ completes the stream; vec will always have a length of 0.
} else {
output.push_str("all_streams\n");
}
output.push_str("}\n");
output
}
}
Expr: nodes::TypedExpression = {
Stream,
StreamVariableDeclaration,
StructDeclaration,
EnumDeclaration,
FnDeclaration
}
/// Nonsense nonterminal used to handle terminal ambiguity. Used for e.g. variable names.
FreeText: String = {
<text: r"[><\w\-]+"> => text.to_string()
}
BracedText: String = {
<text: r#"\{.*}"#> => text.to_string()
}
// nonsense struct used to handle terminal ambiguity. Allowed: variable name,
// or quoted string (string literal like: "hello").
QuotedFreeText: String = {
<quoted_text: r#""[0-9A-Za-z/._\-\w]+""#> => quoted_text.to_string(),
<variable_name: FreeText> => variable_name.to_string()
}
StructDeclaration: nodes::TypedExpression = {
"struct" <struct_name: FreeText> <field_declarations: BracedText> =>
nodes::TypedExpression::from(
crate::nodes::StructDeclarationNode {
name: struct_name,
fields: {
lazy_static! {
static ref WHITESPACE: Regex =
Regex::new(r#"[\s]+"#).unwrap();
static ref FIELD_DECLARATION: Regex =
Regex::new(r#"([\S]+)[\s]*:[\s]*(.*)"#).unwrap();
}
let cleaned = WHITESPACE
.replace_all(&field_declarations, " ");
cleaned[1..cleaned.len() - 1] // remove surrounding braces
.split(",")
.filter_map(|t| FIELD_DECLARATION
.captures(t)
.map(|c| (
c[1].to_string(),
crate::nodes::Type::from_str(&c[2])
.expect("could not parse type in struct definition")
))
)
.collect::<Vec<_>>()
}
}
)
}
StructFieldDeclaration: (String, String) = {
<field_name: FreeText> ":" <field_value: FreeText> => {
(field_name, field_value)
}
}
EnumDeclaration: nodes::TypedExpression = {
"enum" <enum_name: FreeText> <enum_contents: BracedText> => nodes::parse_enum(enum_name, enum_contents),
}
EnumFieldDeclaration: (String, Option<String>) = {
<field_name: FreeText> "=" <field_value: QuotedFreeText> => {
(field_name, Some(field_value))
},
<field_name: FreeText> => {
(field_name, None)
}
}
FnParameter: (String, String) = {
<parameter_name: FreeText> ":" <amp: "&"?> <parameter_type: FreeText> =>
(parameter_name,
match amp {
Some(_) => format!("&{}", parameter_type),
None => parameter_type
}
)
}
FnSignature: nodes::FunctionSignature = {
"fn" <name: FreeText> "("
<parameters: (FnParameter ",")*>
<maybe_trailing_parameter: FnParameter?>
")" "->" <output_type: FreeText*> => nodes::FunctionSignature {
name: name,
parameters: {
let mut cleaned_parameters = parameters
.iter()
.map(|(param, _comma_literal)| param.clone())
.collect::<Vec<_>>();
match maybe_trailing_parameter {
Some(p) => cleaned_parameters.push(p),
None => (),
}
cleaned_parameters
},
output_type: output_type.into_iter().map(
|typ| {
lazy_static! {
static ref STRING: Regex = Regex::new(r"string_([0-9_A-Za-z]+)").unwrap();
}
if let Some(string_def) = STRING.captures(&typ) {
let size_str = string_def
.get(1)
.expect("Could not find size definition for string field");
let size = u32::from_str(size_str.as_str())
.expect("Could not parse string size in struct. Must be parsable as U32.");
return format!("::marigold::marigold_impl::arrayvec::ArrayString<{size}>");
}
typ
}
)
.collect::<Vec<_>>()
.join(" ")
}
}
FnDeclaration: nodes::TypedExpression = {
<signature: FnSignature> <body: r"%%%MARIGOLD_FUNCTION_START%%%[\s\S]*%%%MARIGOLD_FUNCTION_END%%%"> => nodes::TypedExpression::from(
nodes::FnDeclarationNode {
name: signature.name.clone(),
parameters: signature.parameters.clone(),
output_type: signature.output_type.clone(),
body: body.to_string()
}
)
}
Stream: nodes::TypedExpression = {
<inp: InputFunction> <funs:("." <StreamFunction>)*> "." <out: OutputFunction> =>
nodes::TypedExpression::from(
nodes::UnnamedStreamNode{
inp_and_funs: nodes::InputAndMaybeStreamFunctions {
inp,
funs,
},
out: out
}
),
<stream_variable: FreeText> <funs:("." <StreamFunction>)*> "." <out: OutputFunction> =>
nodes::TypedExpression::from(
nodes::NamedStreamNode {
stream_variable,
funs,
out
}
)
}
StreamVariableDeclaration: nodes::TypedExpression = {
<field_name: FreeText> "=" <inp: InputFunction> <funs:("." <StreamFunction>)*> =>
nodes::TypedExpression::from(
nodes::StreamVariableNode {
variable_name: field_name,
inp: inp,
funs: funs
}
),
<field_name: FreeText> "=" <stream_variable: FreeText> <funs:("." <StreamFunction>)*> =>
nodes::TypedExpression::from(
nodes::StreamVariableFromPriorStreamVariableNode {
variable_name: field_name,
prior_stream_variable: stream_variable,
funs: funs
}
)
}
InputFunction: nodes::InputFunctionNode = {
"range(" <n1: FreeText> "," <n2: FreeText> ")" => nodes::InputFunctionNode {
variability: nodes::InputVariability::Constant,
input_count: nodes::InputCount::Known((n2.parse::<num_bigint::BigInt>().expect("could not parse input as integer") - n1.parse::<num_bigint::BigInt>().expect("could not parse input as integer")).to_biguint().unwrap()),
code: format!("::marigold::marigold_impl::futures::stream::iter({n1}..{n2})"),
},
"read_file(" <path: QuotedFreeText> "," "csv" "," "struct" "=" <deserialization_struct: FreeText> ")" => nodes::InputFunctionNode {
variability: nodes::InputVariability::Variable,
input_count: nodes::InputCount::Unknown,
code: {
match path[1..path.len() - 1].rsplit('.').next() {
Some(postfix) => match postfix {
"gz" => format!("
::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
::marigold::marigold_impl::async_compression::tokio::bufread::GzipDecoder::new(
::marigold::marigold_impl::tokio::io::BufReader::new(
::marigold::marigold_impl::tokio::fs::File::open({path})
.await
.expect(\"Marigold could not open file\")
)
).compat()
).into_deserialize::<{deserialization_struct}>()
"),
postfix => format!("
::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
::marigold::marigold_impl::tokio::fs::File::open({path})
.await
.expect(\"Marigold could not open file\")
.compat()
).into_deserialize::<{deserialization_struct}>()
")
},
None => format!("
::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
::marigold::marigold_impl::tokio::fs::File::open({path})
.await
.expect(\"Marigold could not open file\")
.compat()
).into_deserialize::<{deserialization_struct}>()
")
}
}
},
"read_file(" <path: QuotedFreeText> "," "csv" "," "struct" "=" <deserialization_struct: FreeText> "," "infer_compression" "=" "false" ")" => nodes::InputFunctionNode {
variability: nodes::InputVariability::Variable,
input_count: nodes::InputCount::Unknown,
code: format!("
::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
::marigold::marigold_impl::tokio::fs::File::open({path})
.await
.expect(\"Marigold could not open file\")
.compat()
).into_deserialize::<{deserialization_struct}>()
")
},
"select_all" "(" <selected_streams: (InputAndMaybeStreamFunctions ",")*> <last_selected_stream: InputAndMaybeStreamFunctions?> ")" => {
let streams = {
let mut streams = selected_streams
.into_iter()
.map(|(stream, _string_literal)| stream)
.collect::<Vec<_>>();
if let Some(trailing_stream) = last_selected_stream {
streams.push(trailing_stream);
}
streams
};
nodes::InputFunctionNode {
variability: type_aggregation::aggregate_input_variability(streams.iter().map(|s| s.inp.variability.clone())),
input_count: type_aggregation::aggregate_input_count(streams.iter().map(|s| s.inp.input_count.clone())),
code: {
let stream_code = streams
.iter()
.map(|stream| {
let code = stream.code();
format!("::marigold::marigold_impl::run_stream::run_stream({code})")
})
.collect::<Vec<_>>()
.join(",\n");
format!("::marigold::marigold_impl::futures::prelude::stream::select_all::select_all([{stream_code}])")
}
}
}
}
InputAndMaybeStreamFunctions: nodes::InputAndMaybeStreamFunctions = {
<inp: InputFunction> <funs:("." <StreamFunction>)*> => nodes::InputAndMaybeStreamFunctions {
inp: inp,
funs: funs
}
}
StreamFunction: nodes::StreamFunctionNode = {
"permutations("<n: FreeText> ")" => nodes::StreamFunctionNode {
code: format!("permutations({n}).await"),
},
"permutations_with_replacement("<n: FreeText> ")" => nodes::StreamFunctionNode {
code: format!("collect_and_apply(|values| async {{
::marigold::marigold_impl::gen_nested_iter_yield::nested_iter_yield!(values.iter(), {n}, .to_owned(), ::marigold::marigold_impl::)
}})
.await
.await
"),
},
"combinations("<n: FreeText> ")" => nodes::StreamFunctionNode {
code: format!("combinations({n}).await"),
},
"keep_first_n(" <n: FreeText> "," <value_fn: FreeText> ")" => nodes::StreamFunctionNode {
code: format!("keep_first_n({n}, {value_fn}).await"),
},
"filter(" <filter_fn: FreeText> ")" => {
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
return nodes::StreamFunctionNode {
code: format!("filter(|v| ::marigold::marigold_impl::futures::future::ready({filter_fn}(v)))"),
// todo: filters have a bad type that doesn't allow them to compile if passed
// an actual async function, so wrap a sync function in a fake future until
// the filter types are updated.
};
#[cfg(any(feature = "tokio", feature = "async-std"))]
return nodes::StreamFunctionNode {
code: format!("map(|v| async move {{
if {filter_fn}(&v) {{
Some(v)
}}
None
}})
.buffered(
std::cmp::max(
2 * (::marigold::marigold_impl::num_cpus::get() - 1),
2
)
)
.filter_map(|v| v)"),
};
},
"filter_map(" <filter_map_fn: FreeText> ")" => {
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
return nodes::StreamFunctionNode {
code: format!("filter_map(|v| ::marigold::marigold_impl::futures::future::ready({filter_map_fn}(v)))")
};
#[cfg(any(feature = "tokio", feature = "async-std"))]
return nodes::StreamFunctionNode {
code: format!("
map(|v| ::marigold::marigold_impl::futures::future::ready({filter_map_fn}(v)))
.buffered(
std::cmp::max(
2 * (::marigold::marigold_impl::num_cpus::get() - 1),
2
)
)
.filter_map(|v| v)
")
};
},
"map(" <mapping_fn: FreeText> ")" => {
#[cfg(any(feature = "tokio", feature = "async-std"))]
return nodes::StreamFunctionNode {
code: format!("map(|v| async move {{{mapping_fn}(v)}}).buffered(std::cmp::max(2 * (::marigold::marigold_impl::num_cpus::get() - 1), 2))"),
};
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
return nodes::StreamFunctionNode {
code: format!("map({mapping_fn})"),
};
},
"fold(" <state: FreeText> "," <fun: FreeText> ")" => {
let number_or_constructor = match state.trim().parse::<f64>() {
Ok(_) => state,
Err(_) => format!("{state}()")
};
nodes::StreamFunctionNode {
code: format!("marifold({number_or_constructor}, |acc, x| futures::future::ready({fun}(acc, x))).await"),
}
},
"ok()" => nodes::StreamFunctionNode {
code: format!("filter(|r| futures::future::ready(r.is_ok()))
.map(|r| r.unwrap())"),
},
"ok_or_panic()" => nodes::StreamFunctionNode {
code: "map(|r| r.unwrap())".to_string(),
}
}
OutputFunction: nodes::OutputFunctionNode = {
"return" => nodes::OutputFunctionNode {
stream_prefix: "".to_string(),
stream_postfix: "".to_string(),
returning: true
},
"write_file(" <path: QuotedFreeText> "," "csv" ")" => {
if path.ends_with(".gz\"") {
return nodes::OutputFunctionNode {
stream_prefix: format!("{{
if let Some(parent) = ::std::path::Path::new({path}).parent() {{
::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
.await
.expect(\"could not create parent directory for output file\");
}}
static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
::marigold::marigold_impl::tokio::sync::Mutex<
::marigold::marigold_impl::csv_async::AsyncSerializer<
::marigold::marigold_impl::tokio_util::compat::Compat<
::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder<
::marigold::marigold_impl::writer::Writer
>
>
>
>
> = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();
WRITER.set(
::marigold::marigold_impl::tokio::sync::Mutex::new(
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
::marigold::marigold_impl::writer::Writer::file(
::marigold::marigold_impl::tokio::fs::File::create({path})
.await
.expect(\"Could not write to file\")
)
)
.compat_write()
)
)
).expect(\"Could not put CSV writer into OnceCell\");
let mut stream_to_write =
"),
stream_postfix: "
;
stream_to_write.filter_map(
|v| async move {
WRITER
.get()
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await
.serialize(v)
.await
.expect(\"could not write record to CSV\");
None
}
).chain(
// after the stream is complete, flush the writer.
::marigold::marigold_impl::futures::stream::iter(0..1)
.filter_map(|_v| async {
let mut serializer_guard = WRITER
.get() // gets Mutex<...>
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await;
let mut serializer = std::mem::replace(
&mut *serializer_guard,
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
::marigold::marigold_impl::writer::Writer::vector()
)
.compat_write()
)
);
serializer
.into_inner()
.await
.expect(\"Could not get underlying writer from serializer\")
.get_mut()
.shutdown()
.await
.expect(\"Could not shut down underlying writer\");
None
})
)
}".to_string(),
returning: false
};
} else {
return nodes::OutputFunctionNode {
stream_prefix: format!("{{
if let Some(parent) = ::std::path::Path::new({path}).parent() {{
::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
.await
.expect(\"could not create parent directory for output file\");
}}
static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
::marigold::marigold_impl::tokio::sync::Mutex<
::marigold::marigold_impl::csv_async::AsyncSerializer<
::marigold::marigold_impl::tokio_util::compat::Compat<
::marigold::marigold_impl::writer::Writer
>
>
>
> = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();
WRITER.set(
::marigold::marigold_impl::tokio::sync::Mutex::new(
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::writer::Writer::file(
::marigold::marigold_impl::tokio::fs::File::create({path})
.await
.expect(\"Could not write to file\")
)
.compat_write()
)
)
).expect(\"Could not put CSV writer into OnceCell\");
let mut stream_to_write =
"),
stream_postfix: "
;
stream_to_write.filter_map(
|v| async move {
WRITER
.get()
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await
.serialize(v)
.await
.expect(\"could not write record to CSV\");
None
}
).chain(
// after the stream is complete, flush the writer.
::marigold::marigold_impl::futures::stream::iter(0..1)
.filter_map(|_v| async {
let mut serializer_guard = WRITER
.get() // gets Mutex<...>
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await;
let mut serializer = std::mem::replace(
&mut *serializer_guard,
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::writer::Writer::vector()
.compat_write()
)
);
serializer
.into_inner()
.await
.expect(\"Could not get underlying writer from serializer\")
.get_mut()
.shutdown()
.await
.expect(\"Could not shut down underlying writer\");
None
})
)
}".to_string(),
returning: false
}
};
},
"write_file(" <path: QuotedFreeText> "," "csv" "," "compression" "=" "none" ")" => nodes::OutputFunctionNode {
stream_prefix: format!("{{
if let Some(parent) = ::std::path::Path::new({path}).parent() {{
::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
.await
.expect(\"could not create parent directory for output file\");
}}
static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
::marigold::marigold_impl::tokio::sync::Mutex<
::marigold::marigold_impl::csv_async::AsyncSerializer<
::marigold::marigold_impl::tokio_util::compat::Compat<
::marigold::marigold_impl::writer::Writer
>
>
>
> = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();
WRITER.set(
::marigold::marigold_impl::tokio::sync::Mutex::new(
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::writer::Writer::file(
::marigold::marigold_impl::tokio::fs::File::create({path})
.await
.expect(\"Could not write to file\")
).compat_write()
)
)
).expect(\"Could not put CSV writer into OnceCell\");
let mut stream_to_write =
"),
stream_postfix: "
;
stream_to_write.filter_map(
|v| async move {
WRITER
.get()
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await
.serialize(v)
.await
.expect(\"could not write record to CSV\");
None
}
).chain(
// after the stream is complete, flush the writer.
::marigold::marigold_impl::futures::stream::iter(0..1)
.filter_map(|_v| async {
let mut serializer_guard = WRITER
.get() // gets Mutex<...>
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await;
let mut serializer = std::mem::replace(
&mut *serializer_guard,
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::writer::Writer::vector()
.compat_write()
)
);
serializer
.into_inner()
.await
.expect(\"Could not get underlying writer from serializer\")
.get_mut()
.shutdown()
.await
.expect(\"Could not shut down underlying writer\");
None
})
)
}".to_string(),
returning: false,
},
"write_file(" <path: QuotedFreeText> "," "csv" "," "compression" "=" "gz" ")" => nodes::OutputFunctionNode {
stream_prefix: format!("{{
if let Some(parent) = ::std::path::Path::new({path}).parent() {{
::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
.await
.expect(\"could not create parent directory for output file\");
}}
static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
::marigold::marigold_impl::tokio::sync::Mutex<
::marigold::marigold_impl::csv_async::AsyncSerializer<
::marigold::marigold_impl::tokio_util::compat::Compat<
::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder<
::marigold::marigold_impl::writer::Writer
>
>
>
>
> = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();
WRITER.set(
::marigold::marigold_impl::tokio::sync::Mutex::new(
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
::marigold::marigold_impl::writer::Writer::file(
::marigold::marigold_impl::tokio::fs::File::create({path})
.await
.expect(\"Could not write to file\")
)
)
.compat_write()
)
)
).expect(\"Could not put CSV writer into OnceCell\");
let mut stream_to_write =
"),
stream_postfix: "
;
stream_to_write.filter_map(
|v| async move {
WRITER
.get()
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await
.serialize(v)
.await
.expect(\"could not write record to CSV\");
None
}
).chain(
// after the stream is complete, flush the writer.
::marigold::marigold_impl::futures::stream::iter(0..1)
.filter_map(|_v| async {
let mut serializer_guard = WRITER
.get() // gets Mutex<...>
.expect(\"Could not get CSV writer from OnceCell\")
.lock()
.await;
let mut serializer = std::mem::replace(
&mut *serializer_guard,
::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
::marigold::marigold_impl::writer::Writer::vector()
)
.compat_write()
)
);
serializer
.into_inner()
.await
.expect(\"Could not get underlying writer from serializer\")
.get_mut()
.shutdown()
.await
.expect(\"Could not shut down underlying writer\");
None
})
)
}".to_string(),
returning: false,
}
}