parallel/parallel.rs
1// Threaded fan-out / fan-in example.
2//
3// Spawns one thread per topic to write articles in parallel. Each thread gets
4// its own Ctx and Runner -- no shared mutable state, no async runtime.
5//
6// Pipeline per thread: researcher -> writer -> editor -> (loop back to writer if needed) -> done
7//
8// All data flows through the state struct. Swap the stubs for ctx.llm() calls
9// to use a real LLM.
10//
11// Run: cargo run --example parallel
12
13use agent_line::{Agent, Ctx, Outcome, Runner, StepResult, Workflow};
14use std::thread;
15
16// ---------------------------------------------------------------------------
17// State
18// ---------------------------------------------------------------------------
19
20#[derive(Clone, Debug)]
21struct ArticleState {
22 topic: String,
23 research: String,
24 #[allow(dead_code)] // used by LLM calls in the commented-out real implementations
25 guidelines: String,
26 draft: String,
27 feedback: String,
28 revision: u32,
29}
30
31impl ArticleState {
32 fn new(topic: String, guidelines: String) -> Self {
33 Self {
34 topic,
35 research: String::new(),
36 guidelines,
37 draft: String::new(),
38 feedback: String::new(),
39 revision: 0,
40 }
41 }
42}
43
44// ---------------------------------------------------------------------------
45// Agents
46// ---------------------------------------------------------------------------
47
48/// Gathers background material on the topic.
49/// In a real app this would use tools::http for web search, then ctx.llm()
50/// to summarize the findings into research notes.
51struct Researcher;
52impl Agent<ArticleState> for Researcher {
53 fn name(&self) -> &'static str {
54 "researcher"
55 }
56 fn run(&mut self, mut state: ArticleState, ctx: &mut Ctx) -> StepResult<ArticleState> {
57 ctx.log(format!("researching: {}", state.topic));
58
59 // Stub: pretend we did a web search and summarized the results
60 state.research = match state.topic.as_str() {
61 t if t.contains("embedded") => {
62 "Rust's ownership model prevents memory bugs common in C firmware. \
63 The embassy framework provides async on bare metal. \
64 Companies like Espressif ship official Rust support for ESP32."
65 .into()
66 }
67 t if t.contains("plumber") => {
68 "Side projects help tradespeople automate billing and scheduling. \
69 Low-code tools like AppSheet let plumbers build apps without coding. \
70 One plumber built a leak-detection IoT sensor with a Raspberry Pi."
71 .into()
72 }
73 _ => "Raspberry Pi runs Node-RED for home automation wiring diagrams. \
74 Electricians use it to monitor panel loads in real time. \
75 The $35 price point makes it practical for small shops."
76 .into(),
77 };
78
79 Ok((state, Outcome::Continue))
80 }
81}
82
83/// Writes (or rewrites) the article draft.
84///
85/// On the first pass, the system prompt is "write from scratch." When the
86/// editor has sent it back with feedback, the system prompt becomes "rewrite
87/// incorporating this feedback" so the LLM understands it is revising, not
88/// starting over.
89struct Writer;
90impl Agent<ArticleState> for Writer {
91 fn name(&self) -> &'static str {
92 "writer"
93 }
94 fn run(&mut self, mut state: ArticleState, ctx: &mut Ctx) -> StepResult<ArticleState> {
95 state.revision += 1;
96
97 let is_rewrite = !state.feedback.is_empty();
98
99 if is_rewrite {
100 ctx.log(format!(
101 "rewriting draft {} for: {} (feedback: {})",
102 state.revision, state.topic, state.feedback
103 ));
104
105 // Stub for rewrite pass
106 // In a real app:
107 // let response = ctx.llm()
108 // .system(&format!(
109 // "You are a writer. Rewrite this article incorporating the editor's feedback.\n\
110 // Guidelines: {}\n\
111 // Feedback: {}",
112 // state.guidelines, state.feedback
113 // ))
114 // .user(&state.draft)
115 // .send()?;
116 // state.draft = response;
117
118 state.draft = format!(
119 "# {}\n\n\
120 Ever wonder how {} is changing the trades?\n\n\
121 {}",
122 state.topic,
123 state.topic.to_lowercase(),
124 state.research,
125 );
126 state.feedback.clear();
127 } else {
128 ctx.log(format!(
129 "writing draft {} for: {}",
130 state.revision, state.topic
131 ));
132
133 // Stub for first draft
134 // In a real app:
135 // let response = ctx.llm()
136 // .system(&format!(
137 // "You are a writer. Write a short article based on the research notes.\n\
138 // Guidelines: {}",
139 // state.guidelines
140 // ))
141 // .user(&format!("Topic: {}\n\nResearch:\n{}", state.topic, state.research))
142 // .send()?;
143 // state.draft = response;
144
145 state.draft = format!(
146 "# {}\n\n\
147 {} is a interesting topic that many people are talking about.\n\n\
148 {}",
149 state.topic, state.topic, state.research,
150 );
151 }
152
153 Ok((state, Outcome::Continue))
154 }
155}
156
157/// Reviews the draft against the writing guidelines and the author's voice.
158/// Approves or sends it back to the writer with specific feedback.
159struct Editor;
160impl Agent<ArticleState> for Editor {
161 fn name(&self) -> &'static str {
162 "editor"
163 }
164 fn run(&mut self, mut state: ArticleState, ctx: &mut Ctx) -> StepResult<ArticleState> {
165 ctx.log(format!("reviewing rev {}: {}", state.revision, state.topic));
166
167 // Stub: check the draft against guidelines
168 // In a real app:
169 // let response = ctx.llm()
170 // .system(&format!(
171 // "You are an editor. Review this article against the guidelines.\n\
172 // Guidelines: {}\n\n\
173 // If the article passes, respond with exactly: APPROVED\n\
174 // Otherwise list the specific changes needed.",
175 // state.guidelines
176 // ))
177 // .user(&state.draft)
178 // .send()?;
179 //
180 // if response.contains("APPROVED") { ... } else { state.feedback = response; }
181
182 // Stub logic: first draft always needs work, second passes
183 if state.revision < 2 {
184 state.feedback =
185 "opening is bland, needs a hook. 'a interesting' should be 'an interesting'".into();
186 ctx.log(format!("needs revision: {}", state.feedback));
187 Ok((state, Outcome::Next("writer")))
188 } else {
189 ctx.log(format!("approved: {}", state.topic));
190 Ok((state, Outcome::Done))
191 }
192 }
193}
194
195// ---------------------------------------------------------------------------
196// Main
197// ---------------------------------------------------------------------------
198
199fn main() {
200 let topics = vec![
201 "Rust in embedded systems".to_string(),
202 "Why plumbers love side projects".to_string(),
203 "Electricians using Raspberry Pi on the job".to_string(),
204 ];
205
206 // Shared guidelines for all writers -- the author's voice and style rules
207 let guidelines = "\
208 Write in first person. \
209 Do not use emdashes. \
210 Add a touch of humor. \
211 Keep it under 300 words."
212 .to_string();
213
214 println!("=== Fan-out: {} threads ===\n", topics.len());
215
216 // Fan-out: spawn one thread per topic
217 let handles: Vec<_> = topics
218 .into_iter()
219 .enumerate()
220 .map(|(i, topic)| {
221 let guidelines = guidelines.clone();
222
223 thread::spawn(move || {
224 // Each thread gets its own Ctx and Runner -- no shared mutable state
225 let mut ctx = Ctx::new();
226
227 let wf = Workflow::builder("write-article")
228 .register(Researcher)
229 .register(Writer)
230 .register(Editor)
231 .start_at("researcher")
232 .then("writer")
233 .then("editor")
234 .build()
235 .unwrap();
236
237 let mut runner = Runner::new(wf).with_max_retries(5);
238
239 let result = runner.run(ArticleState::new(topic.clone(), guidelines), &mut ctx);
240
241 // Print the log from this thread's pipeline
242 for entry in ctx.logs() {
243 println!(" [thread {}] {}", i, entry);
244 }
245
246 result
247 })
248 })
249 .collect();
250
251 // Fan-in: join all threads, collect results
252 let mut finished = Vec::new();
253 for handle in handles {
254 match handle.join().unwrap() {
255 Ok(state) => finished.push(state),
256 Err(e) => eprintln!("thread failed: {e}"),
257 }
258 }
259
260 // Show results
261 println!("\n=== Fan-in: {} articles ===\n", finished.len());
262 for (i, article) in finished.iter().enumerate() {
263 let preview: String = article.draft.chars().take(72).collect();
264 println!(
265 " {}. {} (rev {})\n {preview}...\n",
266 i + 1,
267 article.topic,
268 article.revision
269 );
270 }
271}