1use super::{Template, TemplateFile};
7use std::path::PathBuf;
8
9fn tf(path: &str, content: &str) -> TemplateFile {
10 TemplateFile {
11 relative_path: PathBuf::from(path),
12 content: content.to_string(),
13 }
14}
15
16pub fn transform_template() -> Template {
22 Template {
23 description: "Stateless data transformer".into(),
24 files: vec![
25 tf("Torvyn.toml", TRANSFORM_TORVYN_TOML),
26 tf("Cargo.toml", TRANSFORM_CARGO_TOML),
27 tf("wit/torvyn-streaming/types.wit", TORVYN_STREAMING_TYPES_WIT),
28 tf(
29 "wit/torvyn-streaming/processor.wit",
30 TORVYN_STREAMING_PROCESSOR_WIT,
31 ),
32 tf(
33 "wit/torvyn-streaming/buffer-allocator.wit",
34 TORVYN_STREAMING_BUFFER_ALLOCATOR_WIT,
35 ),
36 tf(
37 "wit/torvyn-streaming/lifecycle.wit",
38 TORVYN_STREAMING_LIFECYCLE_WIT,
39 ),
40 tf(
41 "wit/torvyn-streaming/world.wit",
42 TORVYN_STREAMING_TRANSFORM_WORLD_WIT,
43 ),
44 tf("src/lib.rs", TRANSFORM_LIB_RS),
45 tf(".gitignore", COMMON_GITIGNORE),
46 tf("README.md", TRANSFORM_README),
47 ],
48 }
49}
50
51const TRANSFORM_TORVYN_TOML: &str = r#"[torvyn]
52name = "{{project_name}}"
53version = "0.1.0"
54contract_version = "{{contract_version}}"
55
56[[component]]
57name = "{{project_name}}"
58path = "."
59language = "rust"
60"#;
61
62const TRANSFORM_CARGO_TOML: &str = r#"[package]
63name = "{{project_name}}"
64version = "0.1.0"
65edition = "2021"
66
67[lib]
68crate-type = ["cdylib"]
69
70[dependencies]
71wit-bindgen = "0.36"
72
73[package.metadata.component]
74package = "{{project_name}}:component"
75"#;
76
77const TORVYN_STREAMING_TYPES_WIT: &str = r#"package torvyn:streaming@0.1.0;
82
83interface types {
84 resource buffer {
85 size: func() -> u64;
86 content-type: func() -> string;
87 read: func(offset: u64, len: u64) -> list<u8>;
88 read-all: func() -> list<u8>;
89 }
90
91 resource mutable-buffer {
92 write: func(offset: u64, bytes: list<u8>) -> result<_, buffer-error>;
93 append: func(bytes: list<u8>) -> result<_, buffer-error>;
94 size: func() -> u64;
95 capacity: func() -> u64;
96 set-content-type: func(content-type: string);
97 freeze: func() -> buffer;
98 }
99
100 variant buffer-error {
101 capacity-exceeded,
102 out-of-bounds,
103 allocation-failed(string),
104 }
105
106 resource flow-context {
107 trace-id: func() -> string;
108 span-id: func() -> string;
109 deadline-ns: func() -> u64;
110 flow-id: func() -> string;
111 }
112
113 record element-meta {
114 sequence: u64,
115 timestamp-ns: u64,
116 content-type: string,
117 }
118
119 record stream-element {
120 meta: element-meta,
121 payload: borrow<buffer>,
122 context: borrow<flow-context>,
123 }
124
125 record output-element {
126 meta: element-meta,
127 payload: buffer,
128 }
129
130 variant process-result {
131 emit(output-element),
132 drop,
133 }
134
135 variant process-error {
136 invalid-input(string),
137 unavailable(string),
138 internal(string),
139 deadline-exceeded,
140 fatal(string),
141 }
142
143 enum backpressure-signal {
144 ready,
145 pause,
146 }
147}
148"#;
149
150const TORVYN_STREAMING_PROCESSOR_WIT: &str = r#"package torvyn:streaming@0.1.0;
151
152interface processor {
153 use types.{stream-element, process-result, process-error};
154
155 process: func(input: stream-element) -> result<process-result, process-error>;
156}
157"#;
158
159const TORVYN_STREAMING_BUFFER_ALLOCATOR_WIT: &str = r#"package torvyn:streaming@0.1.0;
160
161interface buffer-allocator {
162 use types.{mutable-buffer, buffer-error, buffer};
163
164 allocate: func(capacity-hint: u64) -> result<mutable-buffer, buffer-error>;
165 clone-into-mutable: func(source: borrow<buffer>) -> result<mutable-buffer, buffer-error>;
166}
167"#;
168
169const TORVYN_STREAMING_LIFECYCLE_WIT: &str = r#"package torvyn:streaming@0.1.0;
170
171interface lifecycle {
172 use types.{process-error};
173
174 init: func(config: string) -> result<_, process-error>;
175 teardown: func();
176}
177"#;
178
179const TORVYN_STREAMING_SOURCE_WIT: &str = r#"package torvyn:streaming@0.1.0;
180
181interface source {
182 use types.{output-element, process-error, backpressure-signal};
183
184 pull: func() -> result<option<output-element>, process-error>;
185 notify-backpressure: func(signal: backpressure-signal);
186}
187"#;
188
189const TORVYN_STREAMING_SINK_WIT: &str = r#"package torvyn:streaming@0.1.0;
190
191interface sink {
192 use types.{stream-element, process-error, backpressure-signal};
193
194 push: func(element: stream-element) -> result<backpressure-signal, process-error>;
195 complete: func() -> result<_, process-error>;
196}
197"#;
198
199const TORVYN_STREAMING_TRANSFORM_WORLD_WIT: &str = r#"package torvyn:streaming@0.1.0;
200
201world transform {
202 import types;
203 import buffer-allocator;
204
205 export processor;
206}
207"#;
208
209const TRANSFORM_LIB_RS: &str = r#"// Generated by `torvyn init --template transform` on {{date}}
210// Torvyn CLI v{{torvyn_version}}
211//
212// This component implements the torvyn:streaming/processor interface.
213// It receives stream elements, transforms them, and produces output elements.
214
215wit_bindgen::generate!({
216 world: "transform",
217 path: "wit/torvyn-streaming",
218});
219
220use exports::torvyn::streaming::processor::{Guest, ProcessResult};
221use torvyn::streaming::types::{StreamElement, ProcessError};
222
223struct {{component_type}};
224
225impl Guest for {{component_type}} {
226 fn process(input: StreamElement) -> Result<ProcessResult, ProcessError> {
227 // TODO: Implement your transform logic here.
228 //
229 // `input` contains:
230 // - input.meta: element metadata (trace ID, content type, timestamp)
231 // - input.buffer: a handle to the payload buffer
232 //
233 // Pass-through: emit the input unchanged
234 Ok(ProcessResult::Emit(input))
235 }
236}
237
238export!({{component_type}});
239"#;
240
241const TRANSFORM_README: &str = r#"# {{project_name}}
242
243A Torvyn streaming transform component.
244
245## Quick Start
246
247```bash
248torvyn check # Validate contracts and manifest
249torvyn build # Compile to WebAssembly component
250torvyn run # Execute the pipeline locally
251```
252
253## Project Structure
254
255- `Torvyn.toml` — Project manifest
256- `wit/torvyn-streaming/` — Torvyn streaming WIT contracts
257- `src/lib.rs` — Component implementation
258
259## Learn More
260
261- [Torvyn Documentation](https://docs.torvyn.dev)
262- [WIT Contract Guide](https://docs.torvyn.dev/guides/wit-primer)
263"#;
264
265pub fn source_template() -> Template {
271 Template {
272 description: "Data producer (no input, one output)".into(),
273 files: vec![
274 tf("Torvyn.toml", SOURCE_TORVYN_TOML),
275 tf("Cargo.toml", SOURCE_CARGO_TOML),
276 tf("wit/torvyn-streaming/types.wit", TORVYN_STREAMING_TYPES_WIT),
277 tf(
278 "wit/torvyn-streaming/source.wit",
279 TORVYN_STREAMING_SOURCE_WIT,
280 ),
281 tf(
282 "wit/torvyn-streaming/buffer-allocator.wit",
283 TORVYN_STREAMING_BUFFER_ALLOCATOR_WIT,
284 ),
285 tf(
286 "wit/torvyn-streaming/world.wit",
287 TORVYN_STREAMING_SOURCE_WORLD_WIT,
288 ),
289 tf("src/lib.rs", SOURCE_LIB_RS),
290 tf(".gitignore", COMMON_GITIGNORE),
291 tf("README.md", SOURCE_README),
292 ],
293 }
294}
295
296const SOURCE_TORVYN_TOML: &str = r#"[torvyn]
297name = "{{project_name}}"
298version = "0.1.0"
299contract_version = "{{contract_version}}"
300
301[[component]]
302name = "{{project_name}}"
303path = "."
304language = "rust"
305"#;
306
307const SOURCE_CARGO_TOML: &str = r#"[package]
308name = "{{project_name}}"
309version = "0.1.0"
310edition = "2021"
311
312[lib]
313crate-type = ["cdylib"]
314
315[dependencies]
316wit-bindgen = "0.36"
317
318[package.metadata.component]
319package = "{{project_name}}:component"
320"#;
321
322const TORVYN_STREAMING_SOURCE_WORLD_WIT: &str = r#"package torvyn:streaming@0.1.0;
323
324world data-source {
325 import types;
326 import buffer-allocator;
327
328 export source;
329}
330"#;
331
332const SOURCE_LIB_RS: &str = r#"// Generated by `torvyn init --template source` on {{date}}
333// Torvyn CLI v{{torvyn_version}}
334//
335// This component implements the torvyn:streaming/source interface.
336// It generates stream elements for downstream processing.
337
338wit_bindgen::generate!({
339 world: "data-source",
340 path: "wit/torvyn-streaming",
341});
342
343use exports::torvyn::streaming::source::Guest;
344use torvyn::streaming::types::{OutputElement, ElementMeta, ProcessError, BackpressureSignal};
345use torvyn::streaming::buffer_allocator;
346
347struct {{component_type}};
348
349static mut COUNTER: u64 = 0;
350
351impl Guest for {{component_type}} {
352 fn pull() -> Result<Option<OutputElement>, ProcessError> {
353 // TODO: Replace with your data generation logic.
354 //
355 // Return `Ok(None)` to signal end of stream.
356 // Return `Ok(Some(element))` to produce an element.
357
358 let count = unsafe {
359 COUNTER += 1;
360 COUNTER
361 };
362
363 if count > 1000 {
364 return Ok(None); // End of stream after 1000 elements
365 }
366
367 let message = format!("Hello, Torvyn! ({count})");
368 let buf = buffer_allocator::allocate(message.len() as u64)
369 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
370 buf.append(message.as_bytes())
371 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
372
373 Ok(Some(OutputElement {
374 meta: ElementMeta {
375 sequence: count,
376 timestamp_ns: 0,
377 content_type: "text/plain".to_string(),
378 },
379 payload: buf.freeze(),
380 }))
381 }
382
383 fn notify_backpressure(_signal: BackpressureSignal) {
384 // TODO: Handle backpressure signals from downstream.
385 }
386}
387
388export!({{component_type}});
389"#;
390
391const SOURCE_README: &str = r#"# {{project_name}}
392
393A Torvyn streaming source component.
394
395## Quick Start
396
397```bash
398torvyn check # Validate contracts
399torvyn build # Compile to WebAssembly
400```
401"#;
402
403pub fn sink_template() -> Template {
409 Template {
410 description: "Data consumer (one input, no output)".into(),
411 files: vec![
412 tf("Torvyn.toml", SINK_TORVYN_TOML),
413 tf("Cargo.toml", SINK_CARGO_TOML),
414 tf("wit/torvyn-streaming/types.wit", TORVYN_STREAMING_TYPES_WIT),
415 tf("wit/torvyn-streaming/sink.wit", TORVYN_STREAMING_SINK_WIT),
416 tf(
417 "wit/torvyn-streaming/world.wit",
418 TORVYN_STREAMING_SINK_WORLD_WIT,
419 ),
420 tf("src/lib.rs", SINK_LIB_RS),
421 tf(".gitignore", COMMON_GITIGNORE),
422 tf("README.md", SINK_README),
423 ],
424 }
425}
426
427const SINK_TORVYN_TOML: &str = r#"[torvyn]
428name = "{{project_name}}"
429version = "0.1.0"
430contract_version = "{{contract_version}}"
431
432[[component]]
433name = "{{project_name}}"
434path = "."
435language = "rust"
436"#;
437
438const SINK_CARGO_TOML: &str = r#"[package]
439name = "{{project_name}}"
440version = "0.1.0"
441edition = "2021"
442
443[lib]
444crate-type = ["cdylib"]
445
446[dependencies]
447wit-bindgen = "0.36"
448
449[package.metadata.component]
450package = "{{project_name}}:component"
451"#;
452
453const TORVYN_STREAMING_SINK_WORLD_WIT: &str = r#"package torvyn:streaming@0.1.0;
454
455world data-sink {
456 import types;
457
458 export sink;
459}
460"#;
461
462const SINK_LIB_RS: &str = r#"// Generated by `torvyn init --template sink` on {{date}}
463// Torvyn CLI v{{torvyn_version}}
464//
465// This component implements the torvyn:streaming/sink interface.
466// It receives stream elements and consumes them (e.g., writes to stdout).
467
468wit_bindgen::generate!({
469 world: "data-sink",
470 path: "wit/torvyn-streaming",
471});
472
473use exports::torvyn::streaming::sink::Guest;
474use torvyn::streaming::types::{StreamElement, ProcessError, BackpressureSignal};
475
476struct {{component_type}};
477
478impl Guest for {{component_type}} {
479 fn push(element: StreamElement) -> Result<BackpressureSignal, ProcessError> {
480 // TODO: Implement your sink logic here.
481 let data = element.payload.read_all();
482 let text = String::from_utf8_lossy(&data);
483 println!("{text}");
484 Ok(BackpressureSignal::Ready)
485 }
486
487 fn complete() -> Result<(), ProcessError> {
488 // Called when the stream ends.
489 Ok(())
490 }
491}
492
493export!({{component_type}});
494"#;
495
496const SINK_README: &str = r#"# {{project_name}}
497
498A Torvyn streaming sink component.
499
500## Quick Start
501
502```bash
503torvyn check
504torvyn build
505```
506"#;
507
508pub fn filter_template() -> Template {
514 Template {
515 description: "Content filter/guard".into(),
516 files: vec![
517 tf("Torvyn.toml", FILTER_TORVYN_TOML),
518 tf("Cargo.toml", FILTER_CARGO_TOML),
519 tf("wit/torvyn-streaming/types.wit", TORVYN_STREAMING_TYPES_WIT),
520 tf(
521 "wit/torvyn-streaming/filter.wit",
522 TORVYN_STREAMING_FILTER_WIT,
523 ),
524 tf(
525 "wit/torvyn-streaming/world.wit",
526 TORVYN_STREAMING_FILTER_WORLD_WIT,
527 ),
528 tf("src/lib.rs", FILTER_LIB_RS),
529 tf(".gitignore", COMMON_GITIGNORE),
530 tf("README.md", FILTER_README),
531 ],
532 }
533}
534
535const FILTER_TORVYN_TOML: &str = r#"[torvyn]
536name = "{{project_name}}"
537version = "0.1.0"
538contract_version = "{{contract_version}}"
539
540[[component]]
541name = "{{project_name}}"
542path = "."
543language = "rust"
544"#;
545
546const FILTER_CARGO_TOML: &str = r#"[package]
547name = "{{project_name}}"
548version = "0.1.0"
549edition = "2021"
550
551[lib]
552crate-type = ["cdylib"]
553
554[dependencies]
555wit-bindgen = "0.36"
556
557[package.metadata.component]
558package = "{{project_name}}:component"
559"#;
560
561const TORVYN_STREAMING_FILTER_WIT: &str = r#"package torvyn:streaming@0.1.0;
562
563interface filter {
564 use types.{stream-element, process-error};
565
566 /// Evaluate whether a stream element should pass through.
567 ///
568 /// - ok(true): Element passes. Runtime forwards it.
569 /// - ok(false): Element rejected. Runtime drops it.
570 /// - err(error): Filter encountered an error.
571 evaluate: func(element: stream-element) -> result<bool, process-error>;
572}
573"#;
574
575const TORVYN_STREAMING_FILTER_WORLD_WIT: &str = r#"package torvyn:streaming@0.1.0;
576
577world content-filter {
578 import types;
579
580 export filter;
581}
582"#;
583
584const FILTER_LIB_RS: &str = r#"// Generated by `torvyn init --template filter` on {{date}}
585// Torvyn CLI v{{torvyn_version}}
586//
587// This component implements the torvyn:streaming/filter interface.
588// It evaluates each element and decides whether to pass or drop it.
589
590wit_bindgen::generate!({
591 world: "content-filter",
592 path: "wit/torvyn-streaming",
593});
594
595use exports::torvyn::streaming::filter::Guest;
596use torvyn::streaming::types::{StreamElement, ProcessError};
597
598struct {{component_type}};
599
600impl Guest for {{component_type}} {
601 fn evaluate(element: StreamElement) -> Result<bool, ProcessError> {
602 // TODO: Implement your filter logic here.
603 //
604 // Return `Ok(true)` to pass the element through.
605 // Return `Ok(false)` to drop it.
606 //
607 // Access payload bytes: element.payload.read_all()
608 // Access metadata: element.meta.content_type
609
610 // Default: pass everything
611 Ok(true)
612 }
613}
614
615export!({{component_type}});
616"#;
617
618const FILTER_README: &str = r#"# {{project_name}}
619
620A Torvyn streaming filter component.
621
622## Quick Start
623
624```bash
625torvyn check # Validate contracts
626torvyn build # Compile to WebAssembly
627```
628
629## Project Structure
630
631- `Torvyn.toml` — Project manifest
632- `wit/torvyn-streaming/` — Torvyn streaming WIT contracts
633- `src/lib.rs` — Component implementation
634"#;
635
636pub fn router_template() -> Template {
642 Template {
643 description: "Multi-output router".into(),
644 files: vec![
645 tf("Torvyn.toml", TRANSFORM_TORVYN_TOML),
646 tf("Cargo.toml", TRANSFORM_CARGO_TOML),
647 tf("wit/torvyn-streaming/types.wit", TORVYN_STREAMING_TYPES_WIT),
648 tf(
649 "wit/torvyn-streaming/router.wit",
650 TORVYN_STREAMING_ROUTER_WIT,
651 ),
652 tf(
653 "wit/torvyn-streaming/world.wit",
654 TORVYN_STREAMING_ROUTER_WORLD_WIT,
655 ),
656 tf("src/lib.rs", ROUTER_LIB_RS),
657 tf(".gitignore", COMMON_GITIGNORE),
658 tf("README.md", ROUTER_README),
659 ],
660 }
661}
662
663const TORVYN_STREAMING_ROUTER_WIT: &str = r#"package torvyn:streaming@0.1.0;
664
665interface router {
666 use types.{stream-element, process-error};
667
668 /// Determine which output port(s) should receive this element.
669 ///
670 /// Returns a list of port names. Empty list means drop.
671 /// Multiple names means fan-out.
672 route: func(element: stream-element) -> result<list<string>, process-error>;
673}
674"#;
675
676const TORVYN_STREAMING_ROUTER_WORLD_WIT: &str = r#"package torvyn:streaming@0.1.0;
677
678world content-router {
679 import types;
680
681 export router;
682}
683"#;
684
685const ROUTER_LIB_RS: &str = r#"// Generated by `torvyn init --template router` on {{date}}
686// Torvyn CLI v{{torvyn_version}}
687//
688// This component implements the torvyn:streaming/router interface.
689// It routes each element to one of multiple output ports.
690
691wit_bindgen::generate!({
692 world: "content-router",
693 path: "wit/torvyn-streaming",
694});
695
696use exports::torvyn::streaming::router::Guest;
697use torvyn::streaming::types::{StreamElement, ProcessError};
698
699struct {{component_type}};
700
701impl Guest for {{component_type}} {
702 fn route(element: StreamElement) -> Result<Vec<String>, ProcessError> {
703 // TODO: Return the port names to route the element to.
704 //
705 // Return an empty list to drop the element.
706 // Return multiple names for fan-out (runtime borrows the
707 // same buffer to each downstream).
708
709 // Default: route everything to "default"
710 Ok(vec!["default".to_string()])
711 }
712}
713
714export!({{component_type}});
715"#;
716
717const ROUTER_README: &str = r#"# {{project_name}}
718
719A Torvyn streaming router component.
720
721## Quick Start
722
723```bash
724torvyn check # Validate contracts
725torvyn build # Compile to WebAssembly
726```
727
728## Project Structure
729
730- `Torvyn.toml` — Project manifest
731- `wit/torvyn-streaming/` — Torvyn streaming WIT contracts
732- `src/lib.rs` — Component implementation
733"#;
734
735pub fn aggregator_template() -> Template {
741 Template {
742 description: "Stateful windowed aggregator".into(),
743 files: vec![
744 tf("Torvyn.toml", TRANSFORM_TORVYN_TOML),
745 tf("Cargo.toml", TRANSFORM_CARGO_TOML),
746 tf("wit/torvyn-streaming/types.wit", TORVYN_STREAMING_TYPES_WIT),
747 tf(
748 "wit/torvyn-streaming/buffer-allocator.wit",
749 TORVYN_STREAMING_BUFFER_ALLOCATOR_WIT,
750 ),
751 tf(
752 "wit/torvyn-streaming/aggregator.wit",
753 TORVYN_STREAMING_AGGREGATOR_WIT,
754 ),
755 tf(
756 "wit/torvyn-streaming/world.wit",
757 TORVYN_STREAMING_AGGREGATOR_WORLD_WIT,
758 ),
759 tf("src/lib.rs", AGGREGATOR_LIB_RS),
760 tf(".gitignore", COMMON_GITIGNORE),
761 tf("README.md", AGGREGATOR_README),
762 ],
763 }
764}
765
766const TORVYN_STREAMING_AGGREGATOR_WIT: &str = r#"package torvyn:streaming@0.1.0;
767
768interface aggregator {
769 use types.{stream-element, output-element, process-error};
770
771 /// Ingest a stream element into internal state.
772 ///
773 /// - ok(none): Absorbed, no output yet.
774 /// - ok(some(element)): Absorbed AND aggregated result ready.
775 /// - err(error): Ingestion failed.
776 ingest: func(element: stream-element) -> result<option<output-element>, process-error>;
777
778 /// Signal no more elements. Emit remaining buffered results.
779 flush: func() -> result<list<output-element>, process-error>;
780}
781"#;
782
783const TORVYN_STREAMING_AGGREGATOR_WORLD_WIT: &str = r#"package torvyn:streaming@0.1.0;
784
785world stream-aggregator {
786 import types;
787 import buffer-allocator;
788
789 export aggregator;
790}
791"#;
792
793const AGGREGATOR_LIB_RS: &str = r#"// Generated by `torvyn init --template aggregator` on {{date}}
794// Torvyn CLI v{{torvyn_version}}
795//
796// This component implements the torvyn:streaming/aggregator interface.
797// It accumulates elements over a window and emits aggregated results.
798
799wit_bindgen::generate!({
800 world: "stream-aggregator",
801 path: "wit/torvyn-streaming",
802});
803
804use exports::torvyn::streaming::aggregator::Guest;
805use torvyn::streaming::types::{StreamElement, OutputElement, ElementMeta, ProcessError};
806use torvyn::streaming::buffer_allocator;
807
808struct {{component_type}};
809
810// Window state
811static mut WINDOW_COUNT: u64 = 0;
812const WINDOW_SIZE: u64 = 10;
813
814impl Guest for {{component_type}} {
815 fn ingest(element: StreamElement) -> Result<Option<OutputElement>, ProcessError> {
816 // TODO: Implement your aggregation logic.
817 //
818 // Return Ok(None) to absorb without producing output.
819 // Return Ok(Some(output)) when a window completes.
820
821 unsafe {
822 WINDOW_COUNT += 1;
823 if WINDOW_COUNT >= WINDOW_SIZE {
824 WINDOW_COUNT = 0;
825
826 // Clone input buffer and emit aggregated result
827 let data = element.payload.read_all();
828 let out = buffer_allocator::allocate(data.len() as u64)
829 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
830 out.append(&data)
831 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
832
833 Ok(Some(OutputElement {
834 meta: ElementMeta {
835 sequence: element.meta.sequence,
836 timestamp_ns: element.meta.timestamp_ns,
837 content_type: element.meta.content_type,
838 },
839 payload: out.freeze(),
840 }))
841 } else {
842 Ok(None)
843 }
844 }
845 }
846
847 fn flush() -> Result<Vec<OutputElement>, ProcessError> {
848 // Called when the stream ends. Return any remaining buffered results.
849 Ok(vec![])
850 }
851}
852
853export!({{component_type}});
854"#;
855
856const AGGREGATOR_README: &str = r#"# {{project_name}}
857
858A Torvyn stateful windowed aggregator component.
859
860## Quick Start
861
862```bash
863torvyn check # Validate contracts
864torvyn build # Compile to WebAssembly
865```
866
867## Project Structure
868
869- `Torvyn.toml` — Project manifest
870- `wit/torvyn-streaming/` — Torvyn streaming WIT contracts
871- `src/lib.rs` — Component implementation
872"#;
873
874pub fn full_pipeline_template() -> Template {
880 Template {
881 description: "Complete pipeline with source + transform + sink".into(),
882 files: vec![
883 tf("Torvyn.toml", FULL_PIPELINE_TORVYN_TOML),
884 tf("components/source/Cargo.toml", FP_SOURCE_CARGO_TOML),
886 tf(
887 "components/source/wit/torvyn-streaming/types.wit",
888 TORVYN_STREAMING_TYPES_WIT,
889 ),
890 tf(
891 "components/source/wit/torvyn-streaming/source.wit",
892 TORVYN_STREAMING_SOURCE_WIT,
893 ),
894 tf(
895 "components/source/wit/torvyn-streaming/buffer-allocator.wit",
896 TORVYN_STREAMING_BUFFER_ALLOCATOR_WIT,
897 ),
898 tf(
899 "components/source/wit/torvyn-streaming/world.wit",
900 TORVYN_STREAMING_SOURCE_WORLD_WIT,
901 ),
902 tf("components/source/src/lib.rs", FP_SOURCE_LIB_RS),
903 tf("components/transform/Cargo.toml", FP_TRANSFORM_CARGO_TOML),
905 tf(
906 "components/transform/wit/torvyn-streaming/types.wit",
907 TORVYN_STREAMING_TYPES_WIT,
908 ),
909 tf(
910 "components/transform/wit/torvyn-streaming/processor.wit",
911 TORVYN_STREAMING_PROCESSOR_WIT,
912 ),
913 tf(
914 "components/transform/wit/torvyn-streaming/buffer-allocator.wit",
915 TORVYN_STREAMING_BUFFER_ALLOCATOR_WIT,
916 ),
917 tf(
918 "components/transform/wit/torvyn-streaming/world.wit",
919 TORVYN_STREAMING_TRANSFORM_WORLD_WIT,
920 ),
921 tf("components/transform/src/lib.rs", FP_TRANSFORM_LIB_RS),
922 tf("components/sink/Cargo.toml", FP_SINK_CARGO_TOML),
924 tf(
925 "components/sink/wit/torvyn-streaming/types.wit",
926 TORVYN_STREAMING_TYPES_WIT,
927 ),
928 tf(
929 "components/sink/wit/torvyn-streaming/sink.wit",
930 TORVYN_STREAMING_SINK_WIT,
931 ),
932 tf(
933 "components/sink/wit/torvyn-streaming/world.wit",
934 TORVYN_STREAMING_SINK_WORLD_WIT,
935 ),
936 tf("components/sink/src/lib.rs", FP_SINK_LIB_RS),
937 tf(".gitignore", COMMON_GITIGNORE),
938 tf("README.md", FP_README),
939 ],
940 }
941}
942
943const FULL_PIPELINE_TORVYN_TOML: &str = r#"[torvyn]
944name = "{{project_name}}"
945version = "0.1.0"
946description = "A complete streaming pipeline with source, transform, and sink"
947contract_version = "{{contract_version}}"
948
949[[component]]
950name = "source"
951path = "components/source"
952language = "rust"
953
954[[component]]
955name = "transform"
956path = "components/transform"
957language = "rust"
958
959[[component]]
960name = "sink"
961path = "components/sink"
962language = "rust"
963
964[flow.main]
965description = "Generate messages, transform them, and print to stdout"
966
967[flow.main.nodes.source]
968component = "source"
969interface = "torvyn:streaming/source"
970
971[flow.main.nodes.transform]
972component = "transform"
973interface = "torvyn:streaming/processor"
974
975[flow.main.nodes.sink]
976component = "sink"
977interface = "torvyn:streaming/sink"
978
979[[flow.main.edges]]
980from = { node = "source", port = "output" }
981to = { node = "transform", port = "input" }
982
983[[flow.main.edges]]
984from = { node = "transform", port = "output" }
985to = { node = "sink", port = "input" }
986"#;
987
988const FP_SOURCE_CARGO_TOML: &str = r#"[package]
989name = "source"
990version = "0.1.0"
991edition = "2021"
992
993[lib]
994crate-type = ["cdylib"]
995
996[dependencies]
997wit-bindgen = "0.36"
998
999[package.metadata.component]
1000package = "source:component"
1001"#;
1002
1003const FP_SOURCE_LIB_RS: &str = r#"// Source component for the {{project_name}} pipeline
1004// Generates numbered greeting messages.
1005
1006wit_bindgen::generate!({
1007 world: "data-source",
1008 path: "wit/torvyn-streaming",
1009});
1010
1011use exports::torvyn::streaming::source::Guest;
1012use torvyn::streaming::types::{OutputElement, ElementMeta, ProcessError, BackpressureSignal};
1013use torvyn::streaming::buffer_allocator;
1014
1015struct Source;
1016
1017static mut COUNTER: u64 = 0;
1018
1019impl Guest for Source {
1020 fn pull() -> Result<Option<OutputElement>, ProcessError> {
1021 let count = unsafe {
1022 COUNTER += 1;
1023 COUNTER
1024 };
1025
1026 if count > 1000 {
1027 return Ok(None);
1028 }
1029
1030 let message = format!("Hello, Torvyn! ({count})");
1031 let buf = buffer_allocator::allocate(message.len() as u64)
1032 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
1033 buf.append(message.as_bytes())
1034 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
1035
1036 Ok(Some(OutputElement {
1037 meta: ElementMeta {
1038 sequence: count,
1039 timestamp_ns: 0,
1040 content_type: "text/plain".to_string(),
1041 },
1042 payload: buf.freeze(),
1043 }))
1044 }
1045
1046 fn notify_backpressure(_signal: BackpressureSignal) {}
1047}
1048
1049export!(Source);
1050"#;
1051
1052const FP_TRANSFORM_CARGO_TOML: &str = r#"[package]
1053name = "transform"
1054version = "0.1.0"
1055edition = "2021"
1056
1057[lib]
1058crate-type = ["cdylib"]
1059
1060[dependencies]
1061wit-bindgen = "0.36"
1062
1063[package.metadata.component]
1064package = "transform:component"
1065"#;
1066
1067const FP_TRANSFORM_LIB_RS: &str = r#"// Transform component for the {{project_name}} pipeline
1068// Converts input text to uppercase.
1069
1070wit_bindgen::generate!({
1071 world: "transform",
1072 path: "wit/torvyn-streaming",
1073});
1074
1075use exports::torvyn::streaming::processor::{Guest, ProcessResult};
1076use torvyn::streaming::types::{StreamElement, OutputElement, ElementMeta, ProcessError};
1077use torvyn::streaming::buffer_allocator;
1078
1079struct Transform;
1080
1081impl Guest for Transform {
1082 fn process(input: StreamElement) -> Result<ProcessResult, ProcessError> {
1083 let data = input.payload.read_all();
1084 let text = String::from_utf8_lossy(&data);
1085 let upper = text.to_uppercase();
1086
1087 let out_buf = buffer_allocator::allocate(upper.len() as u64)
1088 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
1089 out_buf.append(upper.as_bytes())
1090 .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
1091
1092 Ok(ProcessResult::Emit(OutputElement {
1093 meta: ElementMeta {
1094 sequence: input.meta.sequence,
1095 timestamp_ns: input.meta.timestamp_ns,
1096 content_type: input.meta.content_type,
1097 },
1098 payload: out_buf.freeze(),
1099 }))
1100 }
1101}
1102
1103export!(Transform);
1104"#;
1105
1106const FP_SINK_CARGO_TOML: &str = r#"[package]
1107name = "sink"
1108version = "0.1.0"
1109edition = "2021"
1110
1111[lib]
1112crate-type = ["cdylib"]
1113
1114[dependencies]
1115wit-bindgen = "0.36"
1116
1117[package.metadata.component]
1118package = "sink:component"
1119"#;
1120
1121const FP_SINK_LIB_RS: &str = r#"// Sink component for the {{project_name}} pipeline
1122// Prints received messages to stdout.
1123
1124wit_bindgen::generate!({
1125 world: "data-sink",
1126 path: "wit/torvyn-streaming",
1127});
1128
1129use exports::torvyn::streaming::sink::Guest;
1130use torvyn::streaming::types::{StreamElement, ProcessError, BackpressureSignal};
1131
1132struct Sink;
1133
1134impl Guest for Sink {
1135 fn push(element: StreamElement) -> Result<BackpressureSignal, ProcessError> {
1136 let data = element.payload.read_all();
1137 let text = String::from_utf8_lossy(&data);
1138 println!("{text}");
1139 Ok(BackpressureSignal::Ready)
1140 }
1141
1142 fn complete() -> Result<(), ProcessError> {
1143 Ok(())
1144 }
1145}
1146
1147export!(Sink);
1148"#;
1149
1150const FP_README: &str = r#"# {{project_name}}
1151
1152A complete Torvyn streaming pipeline with three components:
1153
1154- **source** — generates numbered greeting messages
1155- **transform** — converts text to uppercase
1156- **sink** — prints messages to stdout
1157
1158## Quick Start
1159
1160```bash
1161torvyn check # Validate contracts and manifest
1162torvyn build # Compile all components to WebAssembly
1163torvyn run # Run the pipeline
1164torvyn run --limit 10 # Run with element limit
1165```
1166
1167## Project Structure
1168
1169- `Torvyn.toml` — Project manifest with flow definition
1170- `components/source/` — Source component (data producer)
1171- `components/transform/` — Transform component (data processor)
1172- `components/sink/` — Sink component (data consumer)
1173"#;
1174
1175pub fn empty_template() -> Template {
1181 Template {
1182 description: "Minimal skeleton for experienced users".into(),
1183 files: vec![
1184 tf("Torvyn.toml", EMPTY_TORVYN_TOML),
1185 tf(".gitignore", COMMON_GITIGNORE),
1186 ],
1187 }
1188}
1189
1190const EMPTY_TORVYN_TOML: &str = r#"[torvyn]
1191name = "{{project_name}}"
1192version = "0.1.0"
1193contract_version = "{{contract_version}}"
1194"#;
1195
1196const COMMON_GITIGNORE: &str = r#"target/
1201.torvyn/
1202*.wasm
1203"#;