marigold-grammar 0.1.16

Grammar for the marigold language.
Documentation
use crate::nodes;
use crate::type_aggregation;
use std::str::FromStr;
use regex::Regex;

grammar;

pub Program: String = {
  Expr* => {
    let mut output = "async {
    use ::marigold::marigold_impl::*;
    ".to_string();

    // Before we can start streaming, we need to declare the helpers:
    // the enums and structs, functions, and then the stream variable declarations.
    let enums_and_structs = <>
      .iter()
      .filter_map(|expr| match expr {
        crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
        crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
        crate::nodes::TypedExpression::NamedReturningStream(_) => None,
        crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
        crate::nodes::TypedExpression::StructDeclaration(s) => Some(s.code()),
        crate::nodes::TypedExpression::EnumDeclaration(e) => Some(e.code()),
        crate::nodes::TypedExpression::StreamVariable(v) => None,
        crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => None,
        crate::nodes::TypedExpression::FnDeclaration(f) => None,
      })
      .map(|s| format!("{s}\n\n"))
      .collect::<Vec<_>>()
      .join("");

    output.push_str(&enums_and_structs);

    let functions = <>
      .iter()
      .filter_map(|expr| match expr {
        crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
        crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
        crate::nodes::TypedExpression::NamedReturningStream(_) => None,
        crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
        crate::nodes::TypedExpression::StructDeclaration(_) => None,
        crate::nodes::TypedExpression::EnumDeclaration(_) => None,
        crate::nodes::TypedExpression::StreamVariable(_) => None,
        crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None,
        crate::nodes::TypedExpression::FnDeclaration(f) => Some(f.code()),
      })
      .map(|s| format!("{s}\n\n"))
      .collect::<Vec<_>>()
      .join("");

    output.push_str(&functions);

    let stream_variable_declarations = <>
      .iter()
      .filter_map(|expr| match expr {
        crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
        crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
        crate::nodes::TypedExpression::NamedReturningStream(_) => None,
        crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
        crate::nodes::TypedExpression::StructDeclaration(s) => None,
        crate::nodes::TypedExpression::EnumDeclaration(e) => None,
        crate::nodes::TypedExpression::StreamVariable(v) => Some(v.declaration_code()),
        crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => Some(v.declaration_code()),
        crate::nodes::TypedExpression::FnDeclaration(f) => None
      })
      .map(|s| format!("{s}\n\n"))
      .collect::<Vec<_>>()
      .join("");

    output.push_str(&stream_variable_declarations);

    let returning_stream_vec = <>
      .iter()
      .filter_map(
        |expr| match expr {
          crate::nodes::TypedExpression::UnnamedReturningStream(s) => Some(s.code()),
          crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
          crate::nodes::TypedExpression::NamedReturningStream(s) => Some(s.code()),
          crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
          crate::nodes::TypedExpression::StructDeclaration(_) => None,
          crate::nodes::TypedExpression::EnumDeclaration(_) => None,
          crate::nodes::TypedExpression::StreamVariable(_) => None,
          crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None,
          crate::nodes::TypedExpression::FnDeclaration(f) => None,
      })
      .collect::<Vec<_>>();

    let n_returning_streams = returning_stream_vec.len();

    output.push_str(
      returning_stream_vec
        .iter()
        .zip(0..n_returning_streams)
        .map(|(stream_def, i)| format!("let returning_stream_{i} = Box::pin({stream_def});\n"))
        .collect::<Vec<_>>()
        .join("")
        .as_str()
    );

    let non_returning_streams = <>
      .iter()
      .filter_map(
        |expr| match expr {
          crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
          crate::nodes::TypedExpression::UnnamedNonReturningStream(s) => Some(s.code()),
          crate::nodes::TypedExpression::NamedReturningStream(_) => None,
          crate::nodes::TypedExpression::NamedNonReturningStream(s) => Some(s.code()),
          crate::nodes::TypedExpression::StructDeclaration(_) => None,
          crate::nodes::TypedExpression::EnumDeclaration(_) => None,
          crate::nodes::TypedExpression::StreamVariable(_) => None,
          crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(_) => None,
          crate::nodes::TypedExpression::FnDeclaration(f) => None,
      })
      .collect::<Vec<_>>();

    output.push_str(
      non_returning_streams
        .iter()
        .zip(0..non_returning_streams.len())
        .map(|(stream_def, i)| format!("let non_returning_stream_{i} = Box::pin({stream_def});\n"))
        .collect::<Vec<_>>()
        .join("")
        .as_str()
    );

    let stream_variable_runners = <>
      .iter()
      .filter_map(
        |expr| match expr {
          crate::nodes::TypedExpression::UnnamedReturningStream(_) => None,
          crate::nodes::TypedExpression::UnnamedNonReturningStream(_) => None,
          crate::nodes::TypedExpression::NamedReturningStream(_) => None,
          crate::nodes::TypedExpression::NamedNonReturningStream(_) => None,
          crate::nodes::TypedExpression::StructDeclaration(_) => None,
          crate::nodes::TypedExpression::EnumDeclaration(_) => None,
          crate::nodes::TypedExpression::StreamVariable(v) => Some(v.runner_code()),
          crate::nodes::TypedExpression::StreamVariableFromPriorStreamVariable(v) => Some(v.runner_code()),
          crate::nodes::TypedExpression::FnDeclaration(f) => None,
      })
      .collect::<Vec<_>>();

    output.push_str(
      stream_variable_runners
        .iter()
        .zip(0..stream_variable_runners.len())
        .map(|(stream_def, i)| format!("let stream_variable_runners_{i} = Box::pin({stream_def});\n"))
        .collect::<Vec<_>>()
        .join("")
        .as_str()
    );

    let mut streams_string = "vec![\n".to_string();

    streams_string.push_str(
      (0..n_returning_streams)
        .map(|i| format!("returning_stream_{i},\n"))
        .collect::<Vec<_>>()
        .join("")
        .as_str()
    );

    streams_string.push_str(
      (0..non_returning_streams.len())
        .map(|i| format!("non_returning_stream_{i},\n"))
        .collect::<Vec<_>>()
        .join("")
        .as_str()
    );

    streams_string.push_str(
      (0..stream_variable_runners.len())
        .map(|i| format!("stream_variable_runners_{i},\n"))
        .collect::<Vec<_>>()
        .join("")
        .as_str()
    );

    streams_string.push_str("]\n");

    if n_returning_streams > 0 {
      output.push_str(
        format!("
        /// silly function that uses generics to infer the output type (StreamItem) via generics, so that
        /// we can provide the streams as an array of Pin<Box<dyn Stream<Item=StreamItem>>>.
        #[inline(always)]
        fn typed_stream_vec<StreamItem>(v: Vec<core::pin::Pin<Box<dyn futures::Stream<Item=StreamItem>>>>) -> Vec<core::pin::Pin<Box<dyn futures::Stream<Item=StreamItem>>>> {{
         v
        }}
        ").as_str()
      );
      output.push_str(format!("let streams_array = typed_stream_vec({streams_string});").as_str());
    } else {
      output.push_str(format!("let streams_array:  Vec<core::pin::Pin<Box<dyn futures::Stream<Item=()>>>> = {streams_string};").as_str());
    }

    output.push_str("let mut all_streams = ::marigold::marigold_impl::futures::stream::select_all(streams_array);");

    if n_returning_streams == 0 {
      output.push_str("all_streams.collect::<Vec<()>>().await;\n");
      // ^ completes the stream; vec will always have a length of 0.
    } else {
      output.push_str("all_streams\n");
    }
    output.push_str("}\n");

    output
  }
}

Expr: nodes::TypedExpression = {
  Stream,
  StreamVariableDeclaration,
  StructDeclaration,
  EnumDeclaration,
  FnDeclaration
}

/// Nonsense nonterminal used to handle terminal ambiguity. Used for e.g. variable names.
FreeText: String = {
  <text: r"[><\w\-]+"> => text.to_string()
}

BracedText: String = {
  <text: r#"\{.*}"#> => text.to_string()
}

// nonsense struct used to handle terminal ambiguity. Allowed: variable name,
// or quoted string (string literal like: "hello").
QuotedFreeText: String = {
  <quoted_text: r#""[0-9A-Za-z/._\-\w]+""#> => quoted_text.to_string(),
  <variable_name: FreeText> => variable_name.to_string()
}

StructDeclaration: nodes::TypedExpression = {
  "struct" <struct_name: FreeText> <field_declarations: BracedText> =>
    nodes::TypedExpression::from(
      crate::nodes::StructDeclarationNode {
        name: struct_name,
        fields: {
          lazy_static! {
              static ref WHITESPACE: Regex =
                  Regex::new(r#"[\s]+"#).unwrap();

              static ref FIELD_DECLARATION: Regex =
                  Regex::new(r#"([\S]+)[\s]*:[\s]*(.*)"#).unwrap();
          }

          let cleaned = WHITESPACE
            .replace_all(&field_declarations, " ");

          cleaned[1..cleaned.len() - 1] // remove surrounding braces
            .split(",")
            .filter_map(|t| FIELD_DECLARATION
              .captures(t)
              .map(|c| (
                c[1].to_string(),
                crate::nodes::Type::from_str(&c[2])
                  .expect("could not parse type in struct definition")
              ))
            )
            .collect::<Vec<_>>()
        }
      }
    )
}

StructFieldDeclaration: (String, String) = {
 <field_name: FreeText> ":" <field_value: FreeText> => {
  (field_name, field_value)
 }
}

EnumDeclaration: nodes::TypedExpression = {
  "enum" <enum_name: FreeText> <enum_contents: BracedText> => nodes::parse_enum(enum_name, enum_contents),
}

EnumFieldDeclaration: (String, Option<String>) = {
 <field_name: FreeText> "=" <field_value: QuotedFreeText> => {
  (field_name, Some(field_value))
 },
 <field_name: FreeText> => {
  (field_name, None)
 }
}

FnParameter: (String, String) = {
  <parameter_name: FreeText> ":" <amp: "&"?> <parameter_type: FreeText> =>
    (parameter_name,
      match amp {
        Some(_) => format!("&{}", parameter_type),
        None => parameter_type
      }
    )
}

FnSignature: nodes::FunctionSignature = {
  "fn" <name: FreeText> "("
    <parameters: (FnParameter ",")*>
    <maybe_trailing_parameter: FnParameter?>
  ")" "->" <output_type: FreeText*> => nodes::FunctionSignature {
    name: name,
    parameters: {
      let mut cleaned_parameters = parameters
        .iter()
        .map(|(param, _comma_literal)| param.clone())
        .collect::<Vec<_>>();
      match maybe_trailing_parameter {
        Some(p) => cleaned_parameters.push(p),
        None => (),
      }
      cleaned_parameters
    },
    output_type: output_type.into_iter().map(
      |typ| {
        lazy_static! {
            static ref STRING: Regex = Regex::new(r"string_([0-9_A-Za-z]+)").unwrap();
        }
        if let Some(string_def) = STRING.captures(&typ) {
            let size_str = string_def
                .get(1)
                .expect("Could not find size definition for string field");
            let size = u32::from_str(size_str.as_str())
                .expect("Could not parse string size in struct. Must be parsable as U32.");
            return format!("::marigold::marigold_impl::arrayvec::ArrayString<{size}>");
        }
        typ
      }
    )
    .collect::<Vec<_>>()
    .join(" ")
  }
}

FnDeclaration: nodes::TypedExpression = {
  <signature: FnSignature> <body: r"%%%MARIGOLD_FUNCTION_START%%%[\s\S]*%%%MARIGOLD_FUNCTION_END%%%"> => nodes::TypedExpression::from(
    nodes::FnDeclarationNode {
      name: signature.name.clone(),
      parameters: signature.parameters.clone(),
      output_type: signature.output_type.clone(),
      body: body.to_string()
    }
  )
}

Stream: nodes::TypedExpression = {
  <inp: InputFunction> <funs:("." <StreamFunction>)*> "." <out: OutputFunction> =>
    nodes::TypedExpression::from(
      nodes::UnnamedStreamNode{
        inp_and_funs: nodes::InputAndMaybeStreamFunctions {
          inp,
          funs,
        },
        out: out
      }
    ),
  <stream_variable: FreeText> <funs:("." <StreamFunction>)*> "." <out: OutputFunction> =>
    nodes::TypedExpression::from(
      nodes::NamedStreamNode {
        stream_variable,
        funs,
        out
      }
    )
}

StreamVariableDeclaration: nodes::TypedExpression = {
  <field_name: FreeText> "=" <inp: InputFunction> <funs:("." <StreamFunction>)*>  =>
    nodes::TypedExpression::from(
      nodes::StreamVariableNode {
        variable_name: field_name,
        inp: inp,
        funs: funs
      }
    ),
  <field_name: FreeText> "=" <stream_variable: FreeText> <funs:("." <StreamFunction>)*>  =>
    nodes::TypedExpression::from(
      nodes::StreamVariableFromPriorStreamVariableNode {
        variable_name: field_name,
        prior_stream_variable: stream_variable,
        funs: funs
      }
    )
}

InputFunction: nodes::InputFunctionNode = {
  "range(" <n1: FreeText> "," <n2: FreeText> ")" => nodes::InputFunctionNode {
    variability: nodes::InputVariability::Constant,
    input_count: nodes::InputCount::Known((n2.parse::<num_bigint::BigInt>().expect("could not parse input as integer") - n1.parse::<num_bigint::BigInt>().expect("could not parse input as integer")).to_biguint().unwrap()),
    code: format!("::marigold::marigold_impl::futures::stream::iter({n1}..{n2})"),
  },
  "read_file(" <path: QuotedFreeText> "," "csv" "," "struct" "=" <deserialization_struct: FreeText> ")" => nodes::InputFunctionNode {
    variability: nodes::InputVariability::Variable,
    input_count: nodes::InputCount::Unknown,
    code: {
      match path[1..path.len() - 1].rsplit('.').next() {
         Some(postfix) => match postfix {
           "gz" => format!("
           ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
             ::marigold::marigold_impl::async_compression::tokio::bufread::GzipDecoder::new(
              ::marigold::marigold_impl::tokio::io::BufReader::new(
                ::marigold::marigold_impl::tokio::fs::File::open({path})
                  .await
                  .expect(\"Marigold could not open file\")
              )
             ).compat()
           ).into_deserialize::<{deserialization_struct}>()
           "),
           postfix => format!("
           ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
              ::marigold::marigold_impl::tokio::fs::File::open({path})
               .await
               .expect(\"Marigold could not open file\")
               .compat()
           ).into_deserialize::<{deserialization_struct}>()
           ")
         },
         None => format!("
         ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
          ::marigold::marigold_impl::tokio::fs::File::open({path})
           .await
           .expect(\"Marigold could not open file\")
           .compat()
         ).into_deserialize::<{deserialization_struct}>()
         ")
       }
    }
  },
  "read_file(" <path: QuotedFreeText> "," "csv" "," "struct" "=" <deserialization_struct: FreeText> "," "infer_compression" "=" "false" ")" => nodes::InputFunctionNode {
    variability: nodes::InputVariability::Variable,
    input_count: nodes::InputCount::Unknown,
    code: format!("
      ::marigold::marigold_impl::csv_async::AsyncDeserializer::from_reader(
         ::marigold::marigold_impl::tokio::fs::File::open({path})
          .await
          .expect(\"Marigold could not open file\")
          .compat()
      ).into_deserialize::<{deserialization_struct}>()
    ")
  },
  "select_all" "(" <selected_streams: (InputAndMaybeStreamFunctions ",")*> <last_selected_stream: InputAndMaybeStreamFunctions?> ")" => {
    let streams = {
      let mut streams = selected_streams
        .into_iter()
        .map(|(stream, _string_literal)| stream)
        .collect::<Vec<_>>();
      if let Some(trailing_stream) = last_selected_stream {
        streams.push(trailing_stream);
      }
      streams
    };


    nodes::InputFunctionNode {
      variability: type_aggregation::aggregate_input_variability(streams.iter().map(|s| s.inp.variability.clone())),
      input_count: type_aggregation::aggregate_input_count(streams.iter().map(|s| s.inp.input_count.clone())),
      code: {
        let stream_code = streams
          .iter()
          .map(|stream| {
            let code = stream.code();
            format!("::marigold::marigold_impl::run_stream::run_stream({code})")
          })
          .collect::<Vec<_>>()
          .join(",\n");
        format!("::marigold::marigold_impl::futures::prelude::stream::select_all::select_all([{stream_code}])")
      }
    }
  }
}

InputAndMaybeStreamFunctions: nodes::InputAndMaybeStreamFunctions = {
  <inp: InputFunction> <funs:("." <StreamFunction>)*> => nodes::InputAndMaybeStreamFunctions {
    inp: inp,
    funs: funs
  }
}

StreamFunction: nodes::StreamFunctionNode = {
  "permutations("<n: FreeText> ")" => nodes::StreamFunctionNode {
    code: format!("permutations({n}).await"),
  },
  "permutations_with_replacement("<n: FreeText> ")" => nodes::StreamFunctionNode {
    code: format!("collect_and_apply(|values| async {{
                ::marigold::marigold_impl::gen_nested_iter_yield::nested_iter_yield!(values.iter(), {n}, .to_owned(), ::marigold::marigold_impl::)
          }})
          .await
          .await
    "),
  },
  "combinations("<n: FreeText> ")" =>  nodes::StreamFunctionNode {
    code: format!("combinations({n}).await"),
  },
  "keep_first_n(" <n: FreeText> "," <value_fn: FreeText> ")" =>  nodes::StreamFunctionNode {
    code: format!("keep_first_n({n}, {value_fn}).await"),
  },
  "filter(" <filter_fn: FreeText> ")" => {
    #[cfg(not(any(feature = "tokio", feature = "async-std")))]
    return nodes::StreamFunctionNode {
      code: format!("filter(|v| ::marigold::marigold_impl::futures::future::ready({filter_fn}(v)))"),
      // todo: filters have a bad type that doesn't allow them to compile if passed
      // an actual async function, so wrap a sync function in a fake future until
      // the filter types are updated.
    };

    #[cfg(any(feature = "tokio", feature = "async-std"))]
    return nodes::StreamFunctionNode {
      code: format!("map(|v| async move {{
        if {filter_fn}(&v) {{
          Some(v)
        }}
        None
      }})
      .buffered(
        std::cmp::max(
          2 * (::marigold::marigold_impl::num_cpus::get() - 1),
          2
        )
      )
      .filter_map(|v| v)"),
    };
  },
  "filter_map(" <filter_map_fn: FreeText> ")" => {
    #[cfg(not(any(feature = "tokio", feature = "async-std")))]
    return nodes::StreamFunctionNode {
      code: format!("filter_map(|v| ::marigold::marigold_impl::futures::future::ready({filter_map_fn}(v)))")
    };

    #[cfg(any(feature = "tokio", feature = "async-std"))]
    return nodes::StreamFunctionNode {
      code: format!("
      map(|v| ::marigold::marigold_impl::futures::future::ready({filter_map_fn}(v)))
      .buffered(
        std::cmp::max(
          2 * (::marigold::marigold_impl::num_cpus::get() - 1),
          2
        )
      )
      .filter_map(|v| v)
      ")
    };
  },
  "map(" <mapping_fn: FreeText> ")" => {
    #[cfg(any(feature = "tokio", feature = "async-std"))]
    return nodes::StreamFunctionNode {
      code: format!("map(|v| async move {{{mapping_fn}(v)}}).buffered(std::cmp::max(2 * (::marigold::marigold_impl::num_cpus::get() - 1), 2))"),
    };

    #[cfg(not(any(feature = "tokio", feature = "async-std")))]
    return nodes::StreamFunctionNode {
      code: format!("map({mapping_fn})"),
    };
  },
  "fold(" <state: FreeText> "," <fun: FreeText> ")" => {

    let number_or_constructor = match state.trim().parse::<f64>() {
      Ok(_) => state,
      Err(_) => format!("{state}()")
    };

    nodes::StreamFunctionNode {
      code: format!("marifold({number_or_constructor}, |acc, x| futures::future::ready({fun}(acc, x))).await"),
    }
  },
  "ok()" => nodes::StreamFunctionNode {
    code: format!("filter(|r| futures::future::ready(r.is_ok()))
      .map(|r| r.unwrap())"),
  },
  "ok_or_panic()" => nodes::StreamFunctionNode {
    code: "map(|r| r.unwrap())".to_string(),
  }
}

OutputFunction: nodes::OutputFunctionNode = {
  "return" => nodes::OutputFunctionNode {
    stream_prefix: "".to_string(),
    stream_postfix: "".to_string(),
    returning: true
  },
  "write_file(" <path: QuotedFreeText> "," "csv" ")" => {
    if path.ends_with(".gz\"") {
      return nodes::OutputFunctionNode {
        stream_prefix: format!("{{
          if let Some(parent) = ::std::path::Path::new({path}).parent() {{
            ::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
              .await
              .expect(\"could not create parent directory for output file\");
          }}

          static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
            ::marigold::marigold_impl::tokio::sync::Mutex<
              ::marigold::marigold_impl::csv_async::AsyncSerializer<
                  ::marigold::marigold_impl::tokio_util::compat::Compat<
                    ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder<
                        ::marigold::marigold_impl::writer::Writer
                    >
                  >
              >
            >
          > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();

          WRITER.set(
            ::marigold::marigold_impl::tokio::sync::Mutex::new(
              ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
                  ::marigold::marigold_impl::writer::Writer::file(
                    ::marigold::marigold_impl::tokio::fs::File::create({path})
                       .await
                       .expect(\"Could not write to file\")
                  )
                )
                .compat_write()
              )
            )
          ).expect(\"Could not put CSV writer into OnceCell\");

          let mut stream_to_write =

         "),
        stream_postfix: "
            ;
            stream_to_write.filter_map(
              |v| async move {
                WRITER
                  .get()
                  .expect(\"Could not get CSV writer from OnceCell\")
                  .lock()
                  .await
                  .serialize(v)
                  .await
                  .expect(\"could not write record to CSV\");
                None
              }
            ).chain(
              // after the stream is complete, flush the writer.
              ::marigold::marigold_impl::futures::stream::iter(0..1)
                .filter_map(|_v| async {
                  let mut serializer_guard = WRITER
                    .get() // gets Mutex<...>
                    .expect(\"Could not get CSV writer from OnceCell\")
                    .lock()
                    .await;
                  let mut serializer = std::mem::replace(
                    &mut *serializer_guard,
                    ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                      ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
                        ::marigold::marigold_impl::writer::Writer::vector()
                      )
                      .compat_write()
                    )
                  );
                  serializer
                    .into_inner()
                    .await
                    .expect(\"Could not get underlying writer from serializer\")
                    .get_mut()
                    .shutdown()
                    .await
                    .expect(\"Could not shut down underlying writer\");
                  None
                })
            )
        }".to_string(),
        returning: false
      };
    } else {
      return nodes::OutputFunctionNode {
        stream_prefix: format!("{{
            if let Some(parent) = ::std::path::Path::new({path}).parent() {{
              ::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
                .await
                .expect(\"could not create parent directory for output file\");
            }}

            static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
              ::marigold::marigold_impl::tokio::sync::Mutex<
                ::marigold::marigold_impl::csv_async::AsyncSerializer<
                  ::marigold::marigold_impl::tokio_util::compat::Compat<
                    ::marigold::marigold_impl::writer::Writer
                  >
                >
              >
            > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();

            WRITER.set(
              ::marigold::marigold_impl::tokio::sync::Mutex::new(
                ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                  ::marigold::marigold_impl::writer::Writer::file(
                    ::marigold::marigold_impl::tokio::fs::File::create({path})
                       .await
                       .expect(\"Could not write to file\")
                  )
                  .compat_write()
                )
              )
            ).expect(\"Could not put CSV writer into OnceCell\");

            let mut stream_to_write =

           "),
          stream_postfix: "
              ;
              stream_to_write.filter_map(
                |v| async move {
                  WRITER
                    .get()
                    .expect(\"Could not get CSV writer from OnceCell\")
                    .lock()
                    .await
                    .serialize(v)
                    .await
                    .expect(\"could not write record to CSV\");
                  None
                }
              ).chain(
                // after the stream is complete, flush the writer.
                ::marigold::marigold_impl::futures::stream::iter(0..1)
                  .filter_map(|_v| async {
                      let mut serializer_guard = WRITER
                        .get() // gets Mutex<...>
                        .expect(\"Could not get CSV writer from OnceCell\")
                        .lock()
                        .await;
                      let mut serializer = std::mem::replace(
                        &mut *serializer_guard,
                        ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                            ::marigold::marigold_impl::writer::Writer::vector()
                              .compat_write()
                        )
                      );
                      serializer
                        .into_inner()
                        .await
                        .expect(\"Could not get underlying writer from serializer\")
                        .get_mut()
                        .shutdown()
                        .await
                        .expect(\"Could not shut down underlying writer\");
                      None
                  })
              )
          }".to_string(),
          returning: false
        }
      };
  },
  "write_file(" <path: QuotedFreeText> "," "csv" "," "compression" "=" "none" ")" => nodes::OutputFunctionNode {
    stream_prefix: format!("{{
        if let Some(parent) = ::std::path::Path::new({path}).parent() {{
          ::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
            .await
            .expect(\"could not create parent directory for output file\");
        }}

        static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
          ::marigold::marigold_impl::tokio::sync::Mutex<
            ::marigold::marigold_impl::csv_async::AsyncSerializer<
              ::marigold::marigold_impl::tokio_util::compat::Compat<
                ::marigold::marigold_impl::writer::Writer
              >
            >
          >
        > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();

        WRITER.set(
          ::marigold::marigold_impl::tokio::sync::Mutex::new(
            ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
              ::marigold::marigold_impl::writer::Writer::file(
                ::marigold::marigold_impl::tokio::fs::File::create({path})
                   .await
                   .expect(\"Could not write to file\")
              ).compat_write()
            )
          )
        ).expect(\"Could not put CSV writer into OnceCell\");

        let mut stream_to_write =

       "),
      stream_postfix: "
          ;
          stream_to_write.filter_map(
            |v| async move {
              WRITER
                .get()
                .expect(\"Could not get CSV writer from OnceCell\")
                .lock()
                .await
                .serialize(v)
                .await
                .expect(\"could not write record to CSV\");
              None
            }
          ).chain(
            // after the stream is complete, flush the writer.
            ::marigold::marigold_impl::futures::stream::iter(0..1)
              .filter_map(|_v| async {
                let mut serializer_guard = WRITER
                  .get() // gets Mutex<...>
                  .expect(\"Could not get CSV writer from OnceCell\")
                  .lock()
                  .await;
                let mut serializer = std::mem::replace(
                  &mut *serializer_guard,
                  ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                    ::marigold::marigold_impl::writer::Writer::vector()
                      .compat_write()
                  )
                );
                serializer
                  .into_inner()
                  .await
                  .expect(\"Could not get underlying writer from serializer\")
                  .get_mut()
                  .shutdown()
                  .await
                  .expect(\"Could not shut down underlying writer\");
                None
              })
          )
      }".to_string(),
      returning: false,
    },
    "write_file(" <path: QuotedFreeText> "," "csv" "," "compression" "=" "gz" ")" => nodes::OutputFunctionNode {
      stream_prefix: format!("{{
          if let Some(parent) = ::std::path::Path::new({path}).parent() {{
            ::marigold::marigold_impl::tokio::fs::create_dir_all(parent)
              .await
              .expect(\"could not create parent directory for output file\");
          }}

          static WRITER: ::marigold::marigold_impl::once_cell::sync::OnceCell<
            ::marigold::marigold_impl::tokio::sync::Mutex<
              ::marigold::marigold_impl::csv_async::AsyncSerializer<
                  ::marigold::marigold_impl::tokio_util::compat::Compat<
                    ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder<
                        ::marigold::marigold_impl::writer::Writer
                    >
                  >
              >
            >
          > = ::marigold::marigold_impl::once_cell::sync::OnceCell::new();

          WRITER.set(
            ::marigold::marigold_impl::tokio::sync::Mutex::new(
              ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
                  ::marigold::marigold_impl::writer::Writer::file(
                    ::marigold::marigold_impl::tokio::fs::File::create({path})
                       .await
                       .expect(\"Could not write to file\")
                  )
                )
                .compat_write()
              )
            )
          ).expect(\"Could not put CSV writer into OnceCell\");

          let mut stream_to_write =

         "),
        stream_postfix: "
            ;
            stream_to_write.filter_map(
              |v| async move {
                WRITER
                  .get()
                  .expect(\"Could not get CSV writer from OnceCell\")
                  .lock()
                  .await
                  .serialize(v)
                  .await
                  .expect(\"could not write record to CSV\");
                None
              }
            ).chain(
              // after the stream is complete, flush the writer.
              ::marigold::marigold_impl::futures::stream::iter(0..1)
                .filter_map(|_v| async {
                  let mut serializer_guard = WRITER
                    .get() // gets Mutex<...>
                    .expect(\"Could not get CSV writer from OnceCell\")
                    .lock()
                    .await;
                  let mut serializer = std::mem::replace(
                    &mut *serializer_guard,
                    ::marigold::marigold_impl::csv_async::AsyncSerializer::from_writer(
                      ::marigold::marigold_impl::async_compression::tokio::write::GzipEncoder::new(
                        ::marigold::marigold_impl::writer::Writer::vector()
                      )
                      .compat_write()
                    )
                  );
                  serializer
                    .into_inner()
                    .await
                    .expect(\"Could not get underlying writer from serializer\")
                    .get_mut()
                    .shutdown()
                    .await
                    .expect(\"Could not shut down underlying writer\");
                  None
                })
            )
        }".to_string(),
        returning: false,
      }
}