cross-stream 0.13.4

An event stream store for personal, local-first use, specializing in event sourcing.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
use nu_cli::{add_cli_context, gather_parent_env_vars};
use nu_cmd_lang::create_default_context;
use nu_command::add_shell_command_context;
use nu_engine::eval_block_with_early_return;
use nu_parser::parse;
use nu_protocol::debugger::WithoutDebug;
use nu_protocol::engine::{Closure, Command, EngineState, Redirection, Stack, StateWorkingSet};
use nu_protocol::engine::{Job, ThreadJob};
use nu_protocol::shell_error::generic::GenericError;
use nu_protocol::{OutDest, PipelineData, ShellError, Span, Value};

use std::sync::{Arc, Mutex};

use serde_json::Value as JsonValue;

use crate::error::Error;
use crate::nu::commands;
use crate::store::{Frame, Store};

#[derive(Clone)]
pub struct Engine {
    pub state: EngineState,
}

impl Engine {
    pub fn new() -> Result<Self, Error> {
        let mut engine_state = create_default_context();
        engine_state = add_shell_command_context(engine_state);
        engine_state = add_cli_context(engine_state);
        nu_std::load_standard_library(&mut engine_state)?;

        let init_cwd = std::env::current_dir()?;
        gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());

        Ok(Self {
            state: engine_state,
        })
    }

    pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
        let mut working_set = StateWorkingSet::new(&self.state);
        for command in commands {
            working_set.add_decl(command);
        }
        self.state.merge_delta(working_set.render())?;
        Ok(())
    }

    pub fn add_alias(&mut self, name: &str, target: &str) -> Result<(), Error> {
        let mut working_set = StateWorkingSet::new(&self.state);
        let _ = parse(
            &mut working_set,
            None,
            format!("alias {name} = {target}").as_bytes(),
            false,
        );
        self.state.merge_delta(working_set.render())?;
        Ok(())
    }

    pub fn eval(&self, input: PipelineData, expression: String) -> Result<PipelineData, String> {
        let mut working_set = StateWorkingSet::new(&self.state);
        let block = parse(&mut working_set, None, expression.as_bytes(), false);

        if !working_set.parse_errors.is_empty() {
            let first_error = &working_set.parse_errors[0];
            let formatted = nu_protocol::format_cli_error(None, &working_set, first_error, None);
            return Err(formatted);
        }

        let mut engine_state = self.state.clone();
        engine_state
            .merge_delta(working_set.render())
            .map_err(|e| {
                let working_set = StateWorkingSet::new(&self.state);
                nu_protocol::format_cli_error(None, &working_set, &e, None)
            })?;

        let mut stack = Stack::new();
        let mut stack =
            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);

        eval_block_with_early_return::<WithoutDebug>(&engine_state, &mut stack, &block, input)
            .map(|exec_data| exec_data.body)
            .map_err(|e| {
                let working_set = StateWorkingSet::new(&engine_state);
                nu_protocol::format_cli_error(None, &working_set, &e, None)
            })
    }

    pub fn parse_closure(&mut self, script: &str) -> Result<Closure, Box<ShellError>> {
        let mut working_set = StateWorkingSet::new(&self.state);
        let block = parse(&mut working_set, None, script.as_bytes(), false);
        self.state
            .merge_delta(working_set.render())
            .map_err(Box::new)?;

        let mut stack = Stack::new();
        let result = eval_block_with_early_return::<WithoutDebug>(
            &self.state,
            &mut stack,
            &block,
            PipelineData::empty(),
        )
        .map_err(Box::new)?;
        let closure = result
            .body
            .into_value(Span::unknown())
            .map_err(Box::new)?
            .into_closure()
            .map_err(Box::new)?;

        self.state.merge_env(&mut stack).map_err(Box::new)?;

        Ok(closure)
    }

    pub fn add_module(&mut self, name: &str, content: &str) -> Result<(), Box<ShellError>> {
        let mut working_set = StateWorkingSet::new(&self.state);

        // Create temporary file with .nu extension that will be cleaned up when temp_dir is dropped
        let temp_dir = tempfile::TempDir::new().map_err(|e| {
            Box::new(ShellError::Generic(GenericError::new_internal(
                "I/O Error",
                format!("Failed to create temporary directory for module '{name}': {e}"),
            )))
        })?;
        let module_path = temp_dir.path().join(format!("{name}.nu"));
        std::fs::write(&module_path, content).map_err(|e| {
            Box::new(ShellError::Generic(GenericError::new_internal(
                "I/O Error",
                e.to_string(),
            )))
        })?;

        // Parse the use statement
        let use_stmt = format!("use {}", module_path.display());
        let _block = parse(&mut working_set, None, use_stmt.as_bytes(), false);

        // Check for parse errors
        if !working_set.parse_errors.is_empty() {
            let first_error = &working_set.parse_errors[0];
            return Err(Box::new(ShellError::Generic(GenericError::new(
                "Parse error",
                first_error.to_string(),
                first_error.span(),
            ))));
        }

        // Merge changes into engine state
        self.state
            .merge_delta(working_set.render())
            .map_err(Box::new)?;

        // Create a temporary stack and evaluate the module
        let mut stack = Stack::new();
        let _ = eval_block_with_early_return::<WithoutDebug>(
            &self.state,
            &mut stack,
            &_block,
            PipelineData::empty(),
        )
        .map_err(Box::new)?;

        // Merge environment variables into engine state
        self.state.merge_env(&mut stack).map_err(Box::new)?;

        Ok(())
    }

    pub fn with_env_vars(
        mut self,
        vars: impl IntoIterator<Item = (String, String)>,
    ) -> Result<Self, Error> {
        for (key, value) in vars {
            self.state
                .add_env_var(key, nu_protocol::Value::string(value, Span::unknown()));
        }

        Ok(self)
    }

    /// Set the base metadata stamped on frames this engine's `.append` writes
    /// (`service_id`, `{action_id, frame_id}`, ...). Injected as `$env` so the
    /// `.append` decl stays instance-independent. See `append_command`.
    pub fn set_append_meta(&mut self, meta: &JsonValue) {
        self.state.add_env_var(
            crate::nu::commands::append_command::APPEND_META_ENV.to_string(),
            Value::string(meta.to_string(), Span::unknown()),
        );
    }

    pub fn run_closure_in_job(
        &mut self,
        closure: &nu_protocol::engine::Closure,
        args: Vec<Value>,
        pipeline_input: Option<PipelineData>,
        job_name: impl Into<String>,
    ) -> Result<PipelineData, Box<ShellError>> {
        let job_display_name = job_name.into(); // Convert job_name early for error messages

        // -- create & register job (boilerplate) ---
        let (sender, _rx) = std::sync::mpsc::channel();
        let job = ThreadJob::new(
            self.state.signals().clone(),
            Some(job_display_name.clone()),
            sender,
        );
        let _job_id = {
            let mut j = self.state.jobs.lock().unwrap();
            j.add_job(Job::Thread(job.clone()))
        };

        // -- temporarily attach the job to self.state (boilerplate) ---
        let saved_bg_job = self.state.current_job.background_thread_job.clone();
        self.state.current_job.background_thread_job = Some(job.clone());

        // -- prepare stack & validate/inject positional arguments ---
        let block = self.state.get_block(closure.block_id);
        let mut stack = Stack::new();
        let mut stack =
            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);

        let num_required = block.signature.required_positional.len();
        let num_optional = block.signature.optional_positional.len();
        let total_positional = num_required + num_optional;

        if args.len() > total_positional {
            return Err(Box::new(ShellError::Generic(GenericError::new(
                format!(
                    "Too many arguments for job '{job_display_name}': got {}, closure accepts at most {total_positional}.",
                    args.len()
                ),
                format!("Closure signature: {name}", name = block.signature.name),
                block.span.unwrap_or_else(Span::unknown),
            ))));
        }

        if args.len() < num_required {
            return Err(Box::new(ShellError::Generic(GenericError::new(
                format!(
                    "Job '{job_display_name}' run closure expects {num_required} required argument(s), but {} were provided.",
                    args.len()
                ),
                format!("Closure signature: {name}", name = block.signature.name),
                block.span.unwrap_or_else(Span::unknown),
            ))));
        }

        // Inject provided positional args
        for (i, val) in args.iter().enumerate() {
            let param = if i < num_required {
                &block.signature.required_positional[i]
            } else {
                &block.signature.optional_positional[i - num_required]
            };
            if let Some(var_id) = param.var_id {
                stack.add_var(var_id, val.clone());
            }
        }

        // Set default values for optional params not covered by provided args
        let optional_covered = args.len().saturating_sub(num_required);
        for i in optional_covered..num_optional {
            let param = &block.signature.optional_positional[i];
            if let Some(var_id) = param.var_id {
                let default = param
                    .default_value
                    .clone()
                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
                stack.add_var(var_id, default);
            }
        }

        // Determine the actual pipeline input for eval_block_with_early_return
        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);

        // -- run using eval_block_with_early_return ---
        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
            &self.state,
            &mut stack,
            block,
            eval_pipeline_input,
        );

        // -- merge env, restore job, cleanup job (boilerplate, same as before) ---
        if eval_res.is_ok() {
            if let Err(e) = self.state.merge_env(&mut stack) {
                tracing::error!(
                    "Failed to merge environment from job '{}': {}",
                    job_display_name,
                    e
                );
            }
        }

        self.state.current_job.background_thread_job = saved_bg_job;
        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
    }

    /// Evaluate a closure WITHOUT creating and registering a fresh
    /// `ThreadJob` per call. `run_closure_in_job` allocates an mpsc channel,
    /// builds a `ThreadJob`, locks `self.state.jobs` and `add_job`s it -- and
    /// never removes it -- on every invocation. In a hot per-frame actor loop
    /// that churn (plus the unbounded jobs-table growth) dominated the cost.
    /// The caller is expected to have attached a single long-lived background
    /// job to `self.state` once (see the actor `EngineWorker`); this method
    /// just sets up the stack, injects positional args, and evaluates.
    pub fn eval_closure_no_job(
        &mut self,
        closure: &nu_protocol::engine::Closure,
        args: Vec<Value>,
        pipeline_input: Option<PipelineData>,
    ) -> Result<PipelineData, Box<ShellError>> {
        let block = self.state.get_block(closure.block_id);
        let mut stack = Stack::new();
        let mut stack =
            stack.push_redirection(Some(Redirection::Pipe(OutDest::PipeSeparate)), None);

        let num_required = block.signature.required_positional.len();
        let num_optional = block.signature.optional_positional.len();
        let total_positional = num_required + num_optional;

        if args.len() > total_positional {
            return Err(Box::new(ShellError::Generic(GenericError::new(
                format!(
                    "Too many arguments for actor closure: got {}, closure accepts at most {total_positional}.",
                    args.len()
                ),
                format!("Closure signature: {name}", name = block.signature.name),
                block.span.unwrap_or_else(Span::unknown),
            ))));
        }

        if args.len() < num_required {
            return Err(Box::new(ShellError::Generic(GenericError::new(
                format!(
                    "Actor closure expects {num_required} required argument(s), but {} were provided.",
                    args.len()
                ),
                format!("Closure signature: {name}", name = block.signature.name),
                block.span.unwrap_or_else(Span::unknown),
            ))));
        }

        for (i, val) in args.iter().enumerate() {
            let param = if i < num_required {
                &block.signature.required_positional[i]
            } else {
                &block.signature.optional_positional[i - num_required]
            };
            if let Some(var_id) = param.var_id {
                stack.add_var(var_id, val.clone());
            }
        }

        let optional_covered = args.len().saturating_sub(num_required);
        for i in optional_covered..num_optional {
            let param = &block.signature.optional_positional[i];
            if let Some(var_id) = param.var_id {
                let default = param
                    .default_value
                    .clone()
                    .unwrap_or_else(|| Value::nothing(Span::unknown()));
                stack.add_var(var_id, default);
            }
        }

        let eval_pipeline_input = pipeline_input.unwrap_or_else(PipelineData::empty);
        let eval_res = nu_engine::eval_block_with_early_return::<WithoutDebug>(
            &self.state,
            &mut stack,
            block,
            eval_pipeline_input,
        );

        if eval_res.is_ok() {
            if let Err(e) = self.state.merge_env(&mut stack) {
                tracing::error!("Failed to merge environment from actor closure: {}", e);
            }
        }

        eval_res.map(|exec_data| exec_data.body).map_err(Box::new)
    }

    /// Attach a single long-lived background `ThreadJob` to this engine's
    /// state. Call once before a hot eval loop so `eval_closure_no_job` can
    /// skip per-call job creation. Signals still propagate (the job shares
    /// `self.state.signals()`).
    pub fn attach_background_job(&mut self, name: impl Into<String>) {
        let (sender, _rx) = std::sync::mpsc::channel();
        let job = ThreadJob::new(self.state.signals().clone(), Some(name.into()), sender);
        {
            let mut j = self.state.jobs.lock().unwrap();
            j.add_job(Job::Thread(job.clone()));
        }
        self.state.current_job.background_thread_job = Some(job);
    }

    /// Kill the background ThreadJob whose name equals `name`.
    pub fn kill_job_by_name(&self, name: &str) {
        if let Ok(mut jobs) = self.state.jobs.lock() {
            let job_id = {
                jobs.iter().find_map(|(jid, job)| {
                    job.description()
                        .and_then(|desc| if desc == name { Some(jid) } else { None })
                })
            };
            if let Some(job_id) = job_id {
                let _ = jobs.kill_and_remove(job_id);
            }
        }
    }
}

/// Add core cross.stream commands that are common across all Nushell pipeline runners
pub fn add_core_commands(engine: &mut Engine, store: &Store) -> Result<(), Error> {
    engine.add_commands(vec![
        Box::new(commands::cas_command::CasCommand::new(store.clone())),
        Box::new(commands::get_command::GetCommand::new(store.clone())),
        Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
        Box::new(commands::scru128_command::Scru128Command::new()),
    ])
}

/// Which `.cat`/`.last` flavor a pipeline runner exposes.
pub enum ReadMode {
    /// Streaming readers that support `--follow` (eval, actions, services).
    Stream,
    /// Collected readers without follow (actors).
    Plain,
}

/// How `.append` behaves. This is the one store command whose behaviour
/// genuinely varies by runner.
pub enum AppendMode {
    /// Write straight to the store. The per-instance base metadata is injected
    /// via the `XS_APPEND_META` env var (see `append_command`), not baked in.
    Direct,
    /// Buffer appended frames into `output` for the caller to flush (actors).
    Buffered(Arc<Mutex<Vec<Frame>>>),
}

/// Register the `.cat` and `.last` read builtins for a pipeline runner.
pub fn add_read_commands(engine: &mut Engine, store: &Store, mode: ReadMode) -> Result<(), Error> {
    match mode {
        ReadMode::Stream => engine.add_commands(vec![
            Box::new(commands::cat_stream_command::CatStreamCommand::new(
                store.clone(),
            )),
            Box::new(commands::last_stream_command::LastStreamCommand::new(
                store.clone(),
            )),
        ]),
        ReadMode::Plain => engine.add_commands(vec![
            Box::new(commands::cat_command::CatCommand::new(store.clone())),
            Box::new(commands::last_command::LastCommand::new(store.clone())),
        ]),
    }
}

/// Register the write builtins for a pipeline runner: `.append` (per `mode`)
/// plus `.import` and `.cas-post`, which are identical across runners.
pub fn add_write_commands(
    engine: &mut Engine,
    store: &Store,
    mode: AppendMode,
) -> Result<(), Error> {
    engine.add_commands(vec![
        Box::new(commands::import_command::ImportCommand::new(store.clone())),
        Box::new(commands::cas_post_command::CasPostCommand::new(
            store.clone(),
        )),
    ])?;
    match mode {
        AppendMode::Direct => engine.add_commands(vec![Box::new(
            commands::append_command::AppendCommand::new(store.clone()),
        )]),
        AppendMode::Buffered(output) => engine.add_commands(vec![Box::new(
            commands::append_command_buffered::AppendCommand::new(store.clone(), output),
        )]),
    }
}

/// The module-free, instance-free base engine for a runner: nushell + stdlib +
/// the core builtins + the `.rm` alias + the read builtins, plus the `.append`
/// write builtin for direct writers. Build this once per runner and `clone()` it per
/// spawn or restart; `load_modules(as_of)` and `set_append_meta(..)` specialize
/// each clone. Actors pass `direct_write: false` and add their per-instance
/// buffered `.append` to the clone.
pub fn prepared_base(store: &Store, read: ReadMode, direct_write: bool) -> Result<Engine, Error> {
    // Clone the embedder's base engine when the store carries one, else build
    // the default. See ADR 0007.
    let mut engine = match store.base_engine() {
        Some(base) => Engine {
            state: base.clone(),
        },
        None => Engine::new()?,
    };
    add_core_commands(&mut engine, store)?;
    engine.add_alias(".rm", ".remove")?;
    add_read_commands(&mut engine, store, read)?;
    if direct_write {
        add_write_commands(&mut engine, store, AppendMode::Direct)?;
    }
    Ok(engine)
}