1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use crate::commands::classified::expr::run_expression_block;
use crate::commands::classified::internal::run_internal_command;
use crate::context::Context;
use crate::prelude::*;
use crate::stream::InputStream;
use futures::stream::TryStreamExt;
use nu_errors::ShellError;
use nu_protocol::hir::{Block, ClassifiedCommand, Commands};
use nu_protocol::{ReturnSuccess, UntaggedValue, Value};
use std::sync::atomic::Ordering;

pub(crate) async fn run_block(
    block: &Block,
    ctx: &mut Context,
    mut input: InputStream,
    it: &Value,
    vars: &IndexMap<String, Value>,
    env: &IndexMap<String, String>,
) -> Result<InputStream, ShellError> {
    let mut output: Result<InputStream, ShellError> = Ok(InputStream::empty());
    for pipeline in &block.block {
        match output {
            Ok(inp) if inp.is_empty() => {}
            Ok(inp) => {
                let mut output_stream = inp.to_output_stream();

                loop {
                    match output_stream.try_next().await {
                        Ok(Some(ReturnSuccess::Value(Value {
                            value: UntaggedValue::Error(e),
                            ..
                        }))) => return Err(e),
                        Ok(Some(_item)) => {
                            if let Some(err) = ctx.get_errors().get(0) {
                                ctx.clear_errors();
                                return Err(err.clone());
                            }
                            if ctx.ctrl_c.load(Ordering::SeqCst) {
                                break;
                            }
                        }
                        Ok(None) => {
                            if let Some(err) = ctx.get_errors().get(0) {
                                ctx.clear_errors();
                                return Err(err.clone());
                            }
                            break;
                        }
                        Err(e) => return Err(e),
                    }
                }
            }
            Err(e) => {
                return Err(e);
            }
        }
        output = run_pipeline(pipeline, ctx, input, it, vars, env).await;

        input = InputStream::empty();
    }

    output
}

async fn run_pipeline(
    commands: &Commands,
    ctx: &mut Context,
    mut input: InputStream,
    it: &Value,
    vars: &IndexMap<String, Value>,
    env: &IndexMap<String, String>,
) -> Result<InputStream, ShellError> {
    for item in commands.list.clone() {
        input = match item {
            ClassifiedCommand::Dynamic(_) => {
                return Err(ShellError::unimplemented("Dynamic commands"))
            }

            ClassifiedCommand::Expr(expr) => {
                run_expression_block(*expr, ctx, it, vars, env).await?
            }

            ClassifiedCommand::Error(err) => return Err(err.into()),

            ClassifiedCommand::Internal(left) => {
                run_internal_command(left, ctx, input, it, vars, env).await?
            }
        };
    }

    Ok(input)
}