Skip to main content

parallel/
parallel.rs

1// Threaded fan-out / fan-in example.
2//
3// Spawns one thread per topic to write articles in parallel. Each thread gets
4// its own Ctx and Runner -- no shared mutable state, no async runtime.
5//
6// Pipeline per thread: researcher -> writer -> editor -> (loop back to writer if needed) -> done
7//
8// All data flows through the state struct. Swap the stubs for ctx.llm() calls
9// to use a real LLM.
10//
11// Run: cargo run --example parallel
12
13use agent_line::{Agent, Ctx, Outcome, Runner, StepResult, Workflow};
14use std::thread;
15
16// ---------------------------------------------------------------------------
17// State
18// ---------------------------------------------------------------------------
19
20#[derive(Clone, Debug)]
21struct ArticleState {
22    topic: String,
23    research: String,
24    #[allow(dead_code)] // used by LLM calls in the commented-out real implementations
25    guidelines: String,
26    draft: String,
27    feedback: String,
28    revision: u32,
29}
30
31impl ArticleState {
32    fn new(topic: String, guidelines: String) -> Self {
33        Self {
34            topic,
35            research: String::new(),
36            guidelines,
37            draft: String::new(),
38            feedback: String::new(),
39            revision: 0,
40        }
41    }
42}
43
44// ---------------------------------------------------------------------------
45// Agents
46// ---------------------------------------------------------------------------
47
48/// Gathers background material on the topic.
49/// In a real app this would use tools::http for web search, then ctx.llm()
50/// to summarize the findings into research notes.
51struct Researcher;
52impl Agent<ArticleState> for Researcher {
53    fn name(&self) -> &'static str {
54        "researcher"
55    }
56    fn run(&mut self, mut state: ArticleState, ctx: &mut Ctx) -> StepResult<ArticleState> {
57        ctx.log(format!("researching: {}", state.topic));
58
59        // Stub: pretend we did a web search and summarized the results
60        state.research = match state.topic.as_str() {
61            t if t.contains("embedded") => {
62                "Rust's ownership model prevents memory bugs common in C firmware. \
63                 The embassy framework provides async on bare metal. \
64                 Companies like Espressif ship official Rust support for ESP32."
65                    .into()
66            }
67            t if t.contains("plumber") => {
68                "Side projects help tradespeople automate billing and scheduling. \
69                 Low-code tools like AppSheet let plumbers build apps without coding. \
70                 One plumber built a leak-detection IoT sensor with a Raspberry Pi."
71                    .into()
72            }
73            _ => "Raspberry Pi runs Node-RED for home automation wiring diagrams. \
74                 Electricians use it to monitor panel loads in real time. \
75                 The $35 price point makes it practical for small shops."
76                .into(),
77        };
78
79        Ok((state, Outcome::Continue))
80    }
81}
82
83/// Writes (or rewrites) the article draft.
84///
85/// On the first pass, the system prompt is "write from scratch." When the
86/// editor has sent it back with feedback, the system prompt becomes "rewrite
87/// incorporating this feedback" so the LLM understands it is revising, not
88/// starting over.
89struct Writer;
90impl Agent<ArticleState> for Writer {
91    fn name(&self) -> &'static str {
92        "writer"
93    }
94    fn run(&mut self, mut state: ArticleState, ctx: &mut Ctx) -> StepResult<ArticleState> {
95        state.revision += 1;
96
97        let is_rewrite = !state.feedback.is_empty();
98
99        if is_rewrite {
100            ctx.log(format!(
101                "rewriting draft {} for: {} (feedback: {})",
102                state.revision, state.topic, state.feedback
103            ));
104
105            // Stub for rewrite pass
106            // In a real app:
107            // let response = ctx.llm()
108            //     .system(&format!(
109            //         "You are a writer. Rewrite this article incorporating the editor's feedback.\n\
110            //          Guidelines: {}\n\
111            //          Feedback: {}",
112            //         state.guidelines, state.feedback
113            //     ))
114            //     .user(&state.draft)
115            //     .send()?;
116            // state.draft = response;
117
118            state.draft = format!(
119                "# {}\n\n\
120                 Ever wonder how {} is changing the trades?\n\n\
121                 {}",
122                state.topic,
123                state.topic.to_lowercase(),
124                state.research,
125            );
126            state.feedback.clear();
127        } else {
128            ctx.log(format!(
129                "writing draft {} for: {}",
130                state.revision, state.topic
131            ));
132
133            // Stub for first draft
134            // In a real app:
135            // let response = ctx.llm()
136            //     .system(&format!(
137            //         "You are a writer. Write a short article based on the research notes.\n\
138            //          Guidelines: {}",
139            //         state.guidelines
140            //     ))
141            //     .user(&format!("Topic: {}\n\nResearch:\n{}", state.topic, state.research))
142            //     .send()?;
143            // state.draft = response;
144
145            state.draft = format!(
146                "# {}\n\n\
147                 {} is a interesting topic that many people are talking about.\n\n\
148                 {}",
149                state.topic, state.topic, state.research,
150            );
151        }
152
153        Ok((state, Outcome::Continue))
154    }
155}
156
157/// Reviews the draft against the writing guidelines and the author's voice.
158/// Approves or sends it back to the writer with specific feedback.
159struct Editor;
160impl Agent<ArticleState> for Editor {
161    fn name(&self) -> &'static str {
162        "editor"
163    }
164    fn run(&mut self, mut state: ArticleState, ctx: &mut Ctx) -> StepResult<ArticleState> {
165        ctx.log(format!("reviewing rev {}: {}", state.revision, state.topic));
166
167        // Stub: check the draft against guidelines
168        // In a real app:
169        // let response = ctx.llm()
170        //     .system(&format!(
171        //         "You are an editor. Review this article against the guidelines.\n\
172        //          Guidelines: {}\n\n\
173        //          If the article passes, respond with exactly: APPROVED\n\
174        //          Otherwise list the specific changes needed.",
175        //         state.guidelines
176        //     ))
177        //     .user(&state.draft)
178        //     .send()?;
179        //
180        // if response.contains("APPROVED") { ... } else { state.feedback = response; }
181
182        // Stub logic: first draft always needs work, second passes
183        if state.revision < 2 {
184            state.feedback =
185                "opening is bland, needs a hook. 'a interesting' should be 'an interesting'".into();
186            ctx.log(format!("needs revision: {}", state.feedback));
187            Ok((state, Outcome::Next("writer")))
188        } else {
189            ctx.log(format!("approved: {}", state.topic));
190            Ok((state, Outcome::Done))
191        }
192    }
193}
194
195// ---------------------------------------------------------------------------
196// Main
197// ---------------------------------------------------------------------------
198
199fn main() {
200    let topics = vec![
201        "Rust in embedded systems".to_string(),
202        "Why plumbers love side projects".to_string(),
203        "Electricians using Raspberry Pi on the job".to_string(),
204    ];
205
206    // Shared guidelines for all writers -- the author's voice and style rules
207    let guidelines = "\
208        Write in first person. \
209        Do not use emdashes. \
210        Add a touch of humor. \
211        Keep it under 300 words."
212        .to_string();
213
214    println!("=== Fan-out: {} threads ===\n", topics.len());
215
216    // Fan-out: spawn one thread per topic
217    let handles: Vec<_> = topics
218        .into_iter()
219        .enumerate()
220        .map(|(i, topic)| {
221            let guidelines = guidelines.clone();
222
223            thread::spawn(move || {
224                // Each thread gets its own Ctx and Runner -- no shared mutable state
225                let mut ctx = Ctx::new();
226
227                let wf = Workflow::builder("write-article")
228                    .register(Researcher)
229                    .register(Writer)
230                    .register(Editor)
231                    .start_at("researcher")
232                    .then("writer")
233                    .then("editor")
234                    .build()
235                    .unwrap();
236
237                let mut runner = Runner::new(wf).with_max_retries(5);
238
239                let result = runner.run(ArticleState::new(topic.clone(), guidelines), &mut ctx);
240
241                // Print the log from this thread's pipeline
242                for entry in ctx.logs() {
243                    println!("  [thread {}] {}", i, entry);
244                }
245
246                result
247            })
248        })
249        .collect();
250
251    // Fan-in: join all threads, collect results
252    let mut finished = Vec::new();
253    for handle in handles {
254        match handle.join().unwrap() {
255            Ok(state) => finished.push(state),
256            Err(e) => eprintln!("thread failed: {e}"),
257        }
258    }
259
260    // Show results
261    println!("\n=== Fan-in: {} articles ===\n", finished.len());
262    for (i, article) in finished.iter().enumerate() {
263        let preview: String = article.draft.chars().take(72).collect();
264        println!(
265            "  {}. {} (rev {})\n     {preview}...\n",
266            i + 1,
267            article.topic,
268            article.revision
269        );
270    }
271}