Skip to main content

ArrayHandle

Struct ArrayHandle 

Source
pub struct ArrayHandle<'a, T = StoreEntry> { /* private fields */ }
Expand description

A draft-time builder for populating an array entry in the Store.

Returned by Pipeline::array and Store::define_array. Methods are chainable so nested structures can be built in a single expression. Nested arrays and maps are added via push_array and push_map and return child handles borrowing from this one.

Implementations§

Source§

impl<'a> ArrayHandle<'a, StoreEntry>

Source

pub fn push_array(&mut self) -> Result<ArrayHandle<'_, StoreEntry>, StoreError>

Appends a new nested array entry and returns a child handle borrowing from this one to populate it.

Source

pub fn push_map(&mut self) -> Result<MapHandle<'_, StoreEntry>, StoreError>

Appends a new nested map entry and returns a child handle borrowing from this one to populate it.

Examples found in repository?
examples/extend_store_nested.rs (line 13)
10fn write_query_results(store: &mut Store) -> Result<(), Box<dyn std::error::Error>> {
11    store.with_array("rows", |rows| {
12        // Row 1 — all fields inserted via chaining, no .into() needed.
13        let mut row = rows.push_map()?;
14        row.insert("user", "alice@contoso.com")?
15            .insert("ip", "10.0.0.1")?
16            .insert("status", "Success")?
17            .insert("attempts", 1i64)?;
18
19        // Row 2
20        let mut row = rows.push_map()?;
21        row.insert("user", "bob@contoso.com")?
22            .insert("ip", "192.168.1.50")?
23            .insert("status", "Failure")?
24            .insert("attempts", 3i64)?;
25
26        Ok(())
27    })?;
28
29    Ok(())
30}
31
32fn write_grouped_data(store: &mut Store) -> Result<(), Box<dyn std::error::Error>> {
33    store.with_map("by_severity", |groups| {
34        groups.with_array("high", |arr| {
35            arr.push("INC-001")?.push("INC-004")?;
36            Ok(())
37        })?;
38
39        groups.with_array("medium", |arr| {
40            arr.push("INC-002")?;
41            Ok(())
42        })?;
43
44        groups.with_array("low", |arr| {
45            arr.push("INC-003")?.push("INC-005")?.push("INC-006")?;
46            Ok(())
47        })?;
48
49        Ok(())
50    })?;
51
52    Ok(())
53}
54
55fn write_nested_config(store: &mut Store) -> Result<(), Box<dyn std::error::Error>> {
56    store.with_map("pipeline_config", |cfg| {
57        cfg.insert("name", "signin_analysis")?;
58
59        cfg.with_map("source", |src| {
60            src.insert("type", "kql")?
61                .insert("workspace", "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx")?;
62
63            src.with_map("options", |opts| {
64                opts.insert("timeout_secs", 30i64)?
65                    .insert("max_rows", 10_000i64)?;
66                Ok(())
67            })?;
68
69            Ok(())
70        })?;
71
72        cfg.with_array("outputs", |outputs| {
73            let mut o = outputs.push_map()?;
74            o.insert("format", "json")?
75                .insert("path", "/tmp/results.json")?;
76
77            let mut o = outputs.push_map()?;
78            o.insert("format", "csv")?
79                .insert("path", "/tmp/results.csv")?;
80
81            Ok(())
82        })?;
83
84        Ok(())
85    })?;
86
87    Ok(())
88}
More examples
Hide additional examples
examples/extend_store_collections.rs (line 80)
9fn main() -> Result<(), Box<dyn std::error::Error>> {
10    let mut store = Store::new();
11
12    // --- Chainable push ---
13    // push() accepts V: Into<StoreEntry>. The blanket impl
14    // `From<T: Into<Value>> for StoreEntry` means primitives work directly.
15    let mut arr = store.define_array("numbers")?.push(1)?.push(2)?.push(3)?;
16
17    let items = store.get("numbers")?.as_array()?;
18    println!("numbers: len={}", items.len());
19    assert_eq!(items.len(), 3);
20
21    // --- Chainable insert ---
22    // insert() accepts K: Into<String> and V: Into<StoreEntry>.
23    let mut map = store.define_map("config")?;
24    map.insert("host", "localhost")?
25        .insert("port", 8080i64)?
26        .insert("debug", true)?;
27
28    let host = store.get("config")?.get_key("host")?.get_value()?;
29    println!("config.host = {host:?}");
30
31    // --- Closure API on Store ---
32    // with_array scopes the handle so you don't need an explicit drop.
33    store.with_array("tags", |arr| {
34        arr.push("rust")?.push("pipeline")?.push("store")?;
35        Ok(())
36    })?;
37
38    let tags = store.get("tags")?.as_array()?;
39    println!("tags: len={}", tags.len());
40
41    store.with_map("metadata", |map| {
42        map.insert("version", "0.1.0")?.insert("stable", false)?;
43        Ok(())
44    })?;
45
46    let version = store.get("metadata")?.get_key("version")?.get_value()?;
47    println!("metadata.version = {version:?}");
48
49    // --- Nested construction via MapHandle closures ---
50    // with_array and with_map on MapHandle let you build nested structures
51    // without manually managing sub-handles.
52    store.with_map("server", |map| {
53        map.insert("name", "prod-01")?;
54
55        map.with_array("ports", |arr| {
56            arr.push(80)?.push(443)?;
57            Ok(())
58        })?;
59
60        map.with_map("tls", |tls| {
61            tls.insert("enabled", true)?
62                .insert("cert_path", "/etc/ssl/cert.pem")?;
63            Ok(())
64        })?;
65
66        Ok(())
67    })?;
68
69    let tls_enabled = store
70        .get("server")?
71        .get_key("tls")?
72        .get_key("enabled")?
73        .get_value()?;
74    println!("server.tls.enabled = {tls_enabled:?}");
75
76    // --- push_map on ArrayHandle ---
77    // Builds an array of maps — the most common pattern for tabular results.
78    let mut arr = store.define_array("users")?;
79    {
80        let mut user = arr.push_map()?;
81        user.insert("name", "Alice")?.insert("role", "admin")?;
82    }
83    {
84        let mut user = arr.push_map()?;
85        user.insert("name", "Bob")?.insert("role", "viewer")?;
86    }
87
88    let bob_role = store
89        .get("users")?
90        .get_index(1)?
91        .get_key("role")?
92        .get_value()?;
93    println!("users[1].role = {bob_role:?}");
94    assert_eq!(bob_role, &Value::Text("viewer".into()));
95
96    println!("store_collections: ok");
97    Ok(())
98}
Source§

impl<'a, T> ArrayHandle<'a, Vec<T>>

Source

pub fn push_array(&mut self) -> Result<ArrayHandle<'_, T>, StoreError>

Appends a new inner Vec<T> and returns a child array handle for populating it.

Source§

impl<'a, T> ArrayHandle<'a, HashMap<String, T>>

Source

pub fn push_map(&mut self) -> Result<MapHandle<'_, T>, StoreError>

Appends a new inner HashMap<String, T> and returns a child map handle for populating it.

Source§

impl<'a, T> ArrayHandle<'a, T>

Source

pub fn new<N: Into<String>>(name: N, data: &'a mut Vec<T>) -> Self

Wraps a mutable vector in a handle under the given display name. Intended for extension code; user code obtains handles through Pipeline::array or Store::define_array.

Source

pub fn push<V: Into<T>>(&mut self, value: V) -> Result<&mut Self, StoreError>

Appends a value to the array. Returns a mutable reference for chaining.

Examples found in repository?
examples/extend_store_nested.rs (line 35)
32fn write_grouped_data(store: &mut Store) -> Result<(), Box<dyn std::error::Error>> {
33    store.with_map("by_severity", |groups| {
34        groups.with_array("high", |arr| {
35            arr.push("INC-001")?.push("INC-004")?;
36            Ok(())
37        })?;
38
39        groups.with_array("medium", |arr| {
40            arr.push("INC-002")?;
41            Ok(())
42        })?;
43
44        groups.with_array("low", |arr| {
45            arr.push("INC-003")?.push("INC-005")?.push("INC-006")?;
46            Ok(())
47        })?;
48
49        Ok(())
50    })?;
51
52    Ok(())
53}
More examples
Hide additional examples
examples/extend_store_collections.rs (line 15)
9fn main() -> Result<(), Box<dyn std::error::Error>> {
10    let mut store = Store::new();
11
12    // --- Chainable push ---
13    // push() accepts V: Into<StoreEntry>. The blanket impl
14    // `From<T: Into<Value>> for StoreEntry` means primitives work directly.
15    let mut arr = store.define_array("numbers")?.push(1)?.push(2)?.push(3)?;
16
17    let items = store.get("numbers")?.as_array()?;
18    println!("numbers: len={}", items.len());
19    assert_eq!(items.len(), 3);
20
21    // --- Chainable insert ---
22    // insert() accepts K: Into<String> and V: Into<StoreEntry>.
23    let mut map = store.define_map("config")?;
24    map.insert("host", "localhost")?
25        .insert("port", 8080i64)?
26        .insert("debug", true)?;
27
28    let host = store.get("config")?.get_key("host")?.get_value()?;
29    println!("config.host = {host:?}");
30
31    // --- Closure API on Store ---
32    // with_array scopes the handle so you don't need an explicit drop.
33    store.with_array("tags", |arr| {
34        arr.push("rust")?.push("pipeline")?.push("store")?;
35        Ok(())
36    })?;
37
38    let tags = store.get("tags")?.as_array()?;
39    println!("tags: len={}", tags.len());
40
41    store.with_map("metadata", |map| {
42        map.insert("version", "0.1.0")?.insert("stable", false)?;
43        Ok(())
44    })?;
45
46    let version = store.get("metadata")?.get_key("version")?.get_value()?;
47    println!("metadata.version = {version:?}");
48
49    // --- Nested construction via MapHandle closures ---
50    // with_array and with_map on MapHandle let you build nested structures
51    // without manually managing sub-handles.
52    store.with_map("server", |map| {
53        map.insert("name", "prod-01")?;
54
55        map.with_array("ports", |arr| {
56            arr.push(80)?.push(443)?;
57            Ok(())
58        })?;
59
60        map.with_map("tls", |tls| {
61            tls.insert("enabled", true)?
62                .insert("cert_path", "/etc/ssl/cert.pem")?;
63            Ok(())
64        })?;
65
66        Ok(())
67    })?;
68
69    let tls_enabled = store
70        .get("server")?
71        .get_key("tls")?
72        .get_key("enabled")?
73        .get_value()?;
74    println!("server.tls.enabled = {tls_enabled:?}");
75
76    // --- push_map on ArrayHandle ---
77    // Builds an array of maps — the most common pattern for tabular results.
78    let mut arr = store.define_array("users")?;
79    {
80        let mut user = arr.push_map()?;
81        user.insert("name", "Alice")?.insert("role", "admin")?;
82    }
83    {
84        let mut user = arr.push_map()?;
85        user.insert("name", "Bob")?.insert("role", "viewer")?;
86    }
87
88    let bob_role = store
89        .get("users")?
90        .get_index(1)?
91        .get_key("role")?
92        .get_value()?;
93    println!("users[1].role = {bob_role:?}");
94    assert_eq!(bob_role, &Value::Text("viewer".into()));
95
96    println!("store_collections: ok");
97    Ok(())
98}
examples/iter_array.rs (line 17)
12fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // ── Basic iter_array: iterate over items and capture each ──
14    println!("=== Basic iter_array ===");
15    {
16        let mut pipe = Pipeline::default();
17        pipe.array("numbers")?.push(10)?.push(20)?.push(30)?;
18
19        pipe.iter_array(
20            "process",
21            IterSource::array("numbers"),
22            |_index, item, body| {
23                body.step::<SetVar>(
24                    "capture",
25                    params!(
26                        "name" => "doubled",
27                        "value" => Param::reference(item),
28                    ),
29                )?;
30                Ok(())
31            },
32        )?;
33
34        pipe.hook(Logger::new().writer(std::io::stdout()));
35        pipe.hook(Timeout::new(Duration::from_secs(5)));
36
37        let complete = pipe.compile()?.run().wait()?;
38        complete.debug();
39    }
40
41    // ── iter_array with parent variable references ──
42    println!("\n=== iter_array referencing parent vars ===");
43    {
44        let mut pipe = Pipeline::default();
45        pipe.var("prefix", "item_")?;
46        pipe.array("items")?
47            .push("alpha")?
48            .push("beta")?
49            .push("gamma")?;
50
51        pipe.iter_array("loop", IterSource::array("items"), |_index, item, body| {
52            body.step::<SetVar>(
53                "combine",
54                params!(
55                    "name" => "label",
56                    "value" => Param::template(vec![
57                        Param::reference("prefix"),
58                        Param::reference(item),
59                    ]),
60                ),
61            )?;
62            Ok(())
63        })?;
64
65        let complete = pipe.compile()?.run().wait()?;
66        complete.debug();
67    }
68
69    // ── iter_array with multiple body steps ──
70    println!("\n=== iter_array with multi-step body ===");
71    {
72        let mut pipe = Pipeline::default();
73        pipe.var("suffix", "_processed")?;
74        pipe.array("words")?.push("hello")?.push("world")?;
75
76        pipe.iter_array("each", IterSource::array("words"), |_index, item, body| {
77            body.step::<SetVar>(
78                "tag",
79                params!(
80                    "name" => "tagged",
81                    "value" => Param::template(vec![
82                        Param::reference(item),
83                        Param::reference("suffix"),
84                    ]),
85                ),
86            )?;
87            body.step::<SetVar>(
88                "wrap",
89                params!(
90                    "name" => "wrapped",
91                    "value" => Param::template(vec![
92                        Param::literal("["),
93                        Param::reference("tagged"),
94                        Param::literal("]"),
95                    ]),
96                ),
97            )?;
98            Ok(())
99        })?;
100
101        let complete = pipe.compile()?.run().wait()?;
102        complete.debug();
103    }
104
105    // ── Error: iter_array with unresolved source ──
106    println!("\n=== iter_array unresolved source ===");
107    {
108        let mut pipe = Pipeline::default();
109
110        pipe.iter_array("loop", IterSource::array("nonexistent"), |_, _, _| Ok(()))?;
111
112        match pipe.compile() {
113            Err(e) => println!("  Caught: {}", e),
114            Ok(_) => println!("  ERROR: should have failed!"),
115        }
116    }
117
118    // ── Error: iter_array body references unknown variable ──
119    println!("\n=== iter_array unresolved body reference ===");
120    {
121        let mut pipe = Pipeline::default();
122        pipe.array("items")?.push(1)?;
123
124        pipe.iter_array("loop", IterSource::array("items"), |_, _, body| {
125            body.step::<SetVar>(
126                "bad",
127                params!(
128                    "name" => "out",
129                    "value" => Param::reference("ghost"),
130                ),
131            )?;
132            Ok(())
133        })?;
134
135        match pipe.compile() {
136            Err(e) => println!("  Caught: {}", e),
137            Ok(_) => println!("  ERROR: should have failed!"),
138        }
139    }
140
141    Ok(())
142}
examples/pipeline.rs (line 21)
13fn main() -> Result<(), Box<dyn std::error::Error>> {
14    // ── Happy path: derived outputs resolved at compile time ──
15    println!("=== Valid pipeline ===");
16    {
17        let mut pipe = Pipeline::default();
18
19        pipe.var("number", 1)?;
20        pipe.var("string", "example")?;
21        pipe.array("array")?.push(1)?.push(2)?.push(3)?;
22
23        // Derived output name from a variable ref: "string" → "example"
24        pipe.step::<SetVar>(
25            "set_var_1",
26            params!(
27                "name" => Param::reference("string"),
28                "value" => Param::reference("number"),
29            ),
30        )?;
31
32        // Derived output name from a literal: "greeting"
33        pipe.step::<SetVar>(
34            "set_var_2",
35            params!(
36                "name" => "greeting",
37                "value" => "hello world",
38            ),
39        )?;
40
41        // Chained: use derived output "greeting" from set_var_2 as a variable reference
42        pipe.step::<SetVar>(
43            "set_var_3",
44            params!(
45                "name" => "farewell",
46                "value" => Param::reference("greeting"),
47            ),
48        )?;
49
50        // Template referencing a derived output
51        pipe.step::<SetVar>(
52            "set_var_4",
53            params!(
54                "name" => "message",
55                "value" => Param::template(vec![
56                    Param::reference("greeting"),
57                    Param::literal(" and goodbye"),
58                ]),
59            ),
60        )?;
61
62        pipe.returns(
63            "summary",
64            params!(
65                "original_number" => Param::reference("number"),
66                "original_string" => Param::reference("string"),
67                "the_array" => Param::reference("array"),
68                "greeting" => Param::reference("greeting"),
69                "derived_from_ref" => Param::reference("example"),
70                "farewell" => Param::reference("farewell"),
71                "message" => Param::reference("message"),
72            ),
73        )?;
74
75        // Logger: writes each event to stdout
76        pipe.hook(Logger::new().writer(std::io::stdout()));
77
78        // Profiler: tracks per-step wall-clock duration
79        let profiler = Profiler::new().writer(std::io::stdout());
80        let timings = profiler.timings();
81        pipe.hook(profiler);
82
83        // EventLog: collects structured event records for post-mortem inspection
84        let event_log = EventLog::new();
85        let log = event_log.log();
86        pipe.hook(event_log);
87
88        // Timeout: aborts if pipeline exceeds 5 seconds
89        pipe.hook(Timeout::new(Duration::from_secs(5)));
90
91        // StoreValidator: assert "greeting" exists as Text after set_var_2
92        pipe.hook(
93            StoreValidator::new()
94                .after_step("set_var_2")
95                .expect("greeting", Type::Text),
96        );
97
98        let complete = pipe.compile()?.run().wait()?;
99        complete.debug();
100
101        // Read profiler timings programmatically
102        println!("\nProgrammatic timings:");
103        for (name, dur) in timings.lock().unwrap().iter() {
104            println!("  {} took {:.3}ms", name, dur.as_secs_f64() * 1000.0);
105        }
106
107        // Read event log
108        println!("\nEvent log ({} events):", log.lock().unwrap().len());
109        for record in log.lock().unwrap().iter() {
110            println!(
111                "  [{:.3}ms] {:?}{}",
112                record.elapsed.as_secs_f64() * 1000.0,
113                record.kind,
114                record
115                    .step_name
116                    .as_deref()
117                    .map(|s| format!(" ({})", s))
118                    .unwrap_or_default(),
119            );
120        }
121    }
122
123    // ── Error: step references a variable that doesn't exist ──
124    println!("\n=== Unresolved step reference ===");
125    {
126        let mut pipe = Pipeline::default();
127        pipe.var("number", 1)?;
128
129        pipe.step::<SetVar>(
130            "bad_step",
131            params!(
132                "name" => "output",
133                "value" => Param::reference("does_not_exist"),
134            ),
135        )?;
136
137        match pipe.compile() {
138            Err(e) => println!("  Caught: {}", e),
139            Ok(_) => println!("  ERROR: should have failed!"),
140        }
141    }
142
143    // ── Error: return references something that was never produced ──
144    println!("\n=== Unresolved return reference ===");
145    {
146        let mut pipe = Pipeline::default();
147        pipe.var("number", 1)?;
148
149        pipe.step::<SetVar>(
150            "ok_step",
151            params!(
152                "name" => "output",
153                "value" => Param::reference("number"),
154            ),
155        )?;
156
157        pipe.returns(
158            "result",
159            params!(
160                "good" => Param::reference("output"),
161                "bad" => Param::reference("never_created"),
162            ),
163        )?;
164
165        match pipe.compile() {
166            Err(e) => println!("  Caught: {}", e),
167            Ok(_) => println!("  ERROR: should have failed!"),
168        }
169    }
170
171    // ── Error: step references a later step's output (ordering) ──
172    println!("\n=== Forward reference (wrong order) ===");
173    {
174        let mut pipe = Pipeline::default();
175        pipe.var("base", "start")?;
176
177        // This step tries to reference "later_output" which doesn't exist yet
178        pipe.step::<SetVar>(
179            "step_1",
180            params!(
181                "name" => "early",
182                "value" => Param::reference("later_output"),
183            ),
184        )?;
185
186        pipe.step::<SetVar>(
187            "step_2",
188            params!(
189                "name" => "later_output",
190                "value" => Param::reference("base"),
191            ),
192        )?;
193
194        match pipe.compile() {
195            Err(e) => println!("  Caught: {}", e),
196            Ok(_) => println!("  ERROR: should have failed!"),
197        }
198    }
199
200    // ── Error: duplicate variable name ──
201    println!("\n=== Duplicate variable ===");
202    {
203        let mut pipe = Pipeline::default();
204        pipe.var("x", 1)?;
205        match pipe.var("x", 2) {
206            Err(e) => println!("  Caught: {}", e),
207            Ok(_) => println!("  ERROR: should have failed!"),
208        }
209    }
210
211    // ── Error: duplicate step name ──
212    println!("\n=== Duplicate step ===");
213    {
214        let mut pipe = Pipeline::default();
215        pipe.var("v", "val")?;
216
217        pipe.step::<SetVar>("same_name", params!("name" => "a", "value" => "b"))?;
218        match pipe.step::<SetVar>("same_name", params!("name" => "c", "value" => "d")) {
219            Err(e) => println!("  Caught: {}", e),
220            Ok(_) => println!("  ERROR: should have failed!"),
221        }
222    }
223
224    // ── StepFilter: deny a step from executing ──
225    println!("\n=== StepFilter deny ===");
226    {
227        let mut pipe = Pipeline::default();
228        pipe.var("x", 1)?;
229
230        pipe.step::<SetVar>("allowed_step", params!("name" => "a", "value" => "ok"))?;
231        pipe.step::<SetVar>(
232            "blocked_step",
233            params!("name" => "b", "value" => Param::reference("a")),
234        )?;
235
236        pipe.returns("result", params!("a" => Param::reference("a")))?;
237
238        pipe.hook(StepFilter::deny(["blocked_step"]));
239
240        match pipe.compile()?.run().wait() {
241            Err(e) => println!("  Caught: {}", e),
242            Ok(_) => println!("  ERROR: should have been denied!"),
243        }
244    }
245
246    Ok(())
247}

Auto Trait Implementations§

§

impl<'a, T> Freeze for ArrayHandle<'a, T>

§

impl<'a, T> RefUnwindSafe for ArrayHandle<'a, T>
where T: RefUnwindSafe,

§

impl<'a, T> Send for ArrayHandle<'a, T>
where T: Send,

§

impl<'a, T> Sync for ArrayHandle<'a, T>
where T: Sync,

§

impl<'a, T> Unpin for ArrayHandle<'a, T>

§

impl<'a, T> UnsafeUnpin for ArrayHandle<'a, T>

§

impl<'a, T = StoreEntry> !UnwindSafe for ArrayHandle<'a, T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.