cloudllm 0.6.1

A batteries-included Rust toolkit for building intelligent agents with LLM integration, multi-protocol tool support, and multi-agent orchestration.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
//! Four-Agent Panel with Moderator and Shared Tools
//!
//! This example demonstrates a sophisticated multi-agent system using the Council API
//! to estimate global CO₂ emissions from Bitcoin mining. It showcases:
//!
//! - **Parallel execution**: Three workers run simultaneously via Council.Parallel mode
//! - **Two iterative rounds**: Independent work (Round 1) → Moderator feedback → Revisions (Round 2)
//! - **Shared tools**: Memory (KV store), Calculator (mathematical operations)
//! - **Agent autonomy**: LLM models decide which tools to use based on tasks
//! - **Structured outputs**: JSON Memory keys with proper units and ISO-8601 timestamps
//!
//! Workers operate in parallel during each round, storing results in shared Memory.
//! The moderator then validates, provides feedback, and synthesizes the final report.

use cloudllm::clients::grok::GrokClient;
use cloudllm::clients::openai::{Model, OpenAIClient};
use cloudllm::tool_protocol::ToolRegistry;
use cloudllm::tool_protocol::{ToolMetadata, ToolParameter, ToolParameterType, ToolResult};
use cloudllm::tool_protocols::{CustomToolProtocol, MemoryProtocol};
use cloudllm::tools::Memory;
use cloudllm::{
    council::{Council, CouncilMode},
    Agent,
};
use std::collections::HashMap;
use std::sync::Arc;

#[allow(dead_code)]
struct PanelWorkflow {
    memory: Arc<Memory>,
    memory_protocol: Arc<MemoryProtocol>,
    custom_protocol: Arc<CustomToolProtocol>,
    worker_a: Agent,
    worker_b: Agent,
    worker_c: Agent,
    moderator: Agent,
}

impl PanelWorkflow {
    async fn new(
        api_key_grok: &str,
        api_key_openai: &str,
    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
        let memory = Arc::new(Memory::new());

        // Create shared Memory tool access for all agents
        let memory_protocol = Arc::new(MemoryProtocol::new(memory.clone()));

        // Create a custom tool protocol with Calculator tool
        let custom_protocol = Arc::new(CustomToolProtocol::new());

        // Register Calculator tool using the actual Calculator implementation
        use cloudllm::tools::Calculator;
        let calculator = Arc::new(Calculator::new());
        let calculator_clone = calculator.clone();
        custom_protocol
            .register_tool(
                ToolMetadata::new(
                    "calculator",
                    "Performs mathematical calculations with arbitrary precision",
                )
                .with_parameter(
                    ToolParameter::new("expression", ToolParameterType::String)
                        .with_description(
                            "Mathematical expression to evaluate (e.g., '650*1e6*25*24/1000')",
                        )
                        .required(),
                ),
                Arc::new(move |params| {
                    let expr = params["expression"].as_str().unwrap_or("0");
                    // Use the actual Calculator tool to evaluate the expression
                    // Note: We use tokio::runtime to block on the async call from a sync context
                    let result =
                        tokio::runtime::Handle::current().block_on(calculator_clone.evaluate(expr));
                    match result {
                        Ok(value) => Ok(ToolResult {
                            success: true,
                            output: serde_json::json!({ "result": value, "expression": expr }),
                            error: None,
                            metadata: HashMap::new(),
                        }),
                        Err(e) => Ok(ToolResult {
                            success: false,
                            output: serde_json::json!({ "expression": expr }),
                            error: Some(format!("Calculation error: {}", e)),
                            metadata: HashMap::new(),
                        }),
                    }
                }),
            )
            .await;

        // Worker A: Data Collector (uses Memory)
        let registry_a = Arc::new(ToolRegistry::new(memory_protocol.clone()));
        let worker_a = Agent::new(
            "worker_a",
            "Data Collector",
            Arc::new(GrokClient::new_with_model_str(api_key_grok, "grok-4")),
        )
        .with_expertise(
            "Fetches current global Bitcoin hashrate and energy efficiency metrics from primary sources"
        )
        .with_personality("Meticulous, data-driven, focuses on source quality and recency")
        .with_tools(registry_a);

        // Worker B: Energy Analyst (uses Calculator + Memory)
        let registry_b = Arc::new(ToolRegistry::new(custom_protocol.clone()));
        let worker_b = Agent::new(
            "worker_b",
            "Energy Analyst",
            Arc::new(GrokClient::new_with_model_str(api_key_grok, "grok-4")),
        )
        .with_expertise(
            "Converts hashrate and efficiency data into daily energy consumption (kWh/day) using precise unit conversion"
        )
        .with_personality("Systematic, careful with unit conversions, emphasizes precision in calculations")
        .with_tools(registry_b);

        // Worker C: Emissions Analyst (uses Memory + Calculator)
        let registry_c = Arc::new(ToolRegistry::new(memory_protocol.clone()));
        let worker_c = Agent::new(
            "worker_c",
            "Emissions Analyst",
            Arc::new(GrokClient::new_with_model_str(api_key_grok, "grok-4")),
        )
        .with_expertise(
            "Fetches global electricity emission factors and computes CO₂ emissions in tons per day"
        )
        .with_personality("Thorough, questions assumptions, provides uncertainty bounds")
        .with_tools(registry_c);

        // Moderator: Verifier & Integrator (uses Memory)
        let moderator_registry = Arc::new(ToolRegistry::new(memory_protocol.clone()));
        let moderator = Agent::new(
            "moderator",
            "Verifier & Integrator",
            Arc::new(OpenAIClient::new_with_model_enum(
                api_key_openai,
                Model::GPT41Nano,
            )),
        )
        .with_expertise(
            "Validates data recency, unit coherence, magnitude sanity; integrates and audits multi-agent findings"
        )
        .with_personality("Skeptical, audit-focused, demands reproducibility and clarity")
        .with_tools(moderator_registry);

        Ok(Self {
            memory,
            memory_protocol,
            custom_protocol,
            worker_a,
            worker_b,
            worker_c,
            moderator,
        })
    }

    async fn run_round_1(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        println!("\n╔════════════════════════════════════════════════════════════════╗");
        println!("║              ROUND 1: PARALLEL INDEPENDENT WORK               ║");
        println!("╚════════════════════════════════════════════════════════════════╝\n");

        println!("🔄 Launching three workers in PARALLEL...\n");

        // Create a council for parallel worker execution
        let mut council = Council::new("workers-panel", "Worker Analysis Panel")
            .with_mode(CouncilMode::Parallel)
            .with_max_tokens(4096);

        council.add_agent(self.worker_a.clone())?;
        council.add_agent(self.worker_b.clone())?;
        council.add_agent(self.worker_c.clone())?;

        let prompt = r#"You are part of a three-agent research team analyzing Bitcoin mining CO₂ emissions.

WORKER A (Data Collector): Research and fetch CURRENT global Bitcoin metrics:
- Global hashrate in EH/s (exahashes per second)
- Energy efficiency in J/TH (joules per terahash)
Store findings in Memory with keys: r1/source.hashrate, r1/source.energy_per_ths

WORKER B (Energy Analyst): Read Worker A's findings, calculate daily energy:
- TH/s = EH/s × 10^6
- Power (W) = TH/s × (J/TH)
- kWh/day = Power (W) × 24 / 1000
Use the Calculator tool for math. Store in Memory: r1/energy.kwh_per_day

WORKER C (Emissions Analyst): Research global electricity emission factor:
- Find kgCO2/kWh (current global average)
- Read r1/energy.kwh_per_day from Memory
- Calculate: CO₂ tons/day = kWh/day × kgCO2/kWh / 1000
Store in Memory: r1/emissions.tons_per_day, r1/source.co2_factor

ALL WORKERS: Use Memory tool to store findings with:
- Exact values and source URLs
- ISO-8601 timestamps
- Any uncertainty notes

Execute your task independently. Do NOT wait for others.
"#;

        let response = council.discuss(prompt, 1).await?;

        println!("✓ Round 1 complete. All workers completed in parallel:");
        for msg in &response.messages {
            if let Some(agent_id) = &msg.agent_id {
                println!(
                    "\n  📌 {}: {}",
                    agent_id,
                    &msg.content[..100.min(msg.content.len())]
                );
            }
        }

        println!("\n✓ All worker findings stored in Memory under r1/* namespace.\n");
        Ok(())
    }

    async fn run_moderator_review_round_1(
        &self,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        println!("\n╔════════════════════════════════════════════════════════════════╗");
        println!("║               MODERATOR ROUND 1: VALIDATION & FEEDBACK        ║");
        println!("╚════════════════════════════════════════════════════════════════╝\n");

        let task_moderator = r#"
You are the moderator validating Round 1 findings stored in Memory. Your task:

1. Read ALL r1/source.* and r1/energy.* and r1/emissions.* values from Memory
2. Validate:
   - Timestamps ≤48 hours old? ✓
   - Units coherent? (EH/s → TH/s → W → kWh/day) ✓
   - Magnitudes reasonable? (Bitcoin hashrate ~600-800 EH/s typical)
   - Primary sources used (not blogs)? ✓
   - Raw data and trails present? ✓

3. Generate feedback for each worker:
   - Issues found (if any)
   - Specific actions for Round 2
   - Permission grants for Round 2 read/write

4. Store feedback/r1 in Memory as JSON with per-worker entries

Allow Round-2: Workers may now read r1/source.* and r1/energy.* for validation.
"#;

        let response = self
            .moderator
            .generate(
                "You are validating agent work. Use Memory to read findings and store feedback.",
                task_moderator,
                &[],
            )
            .await?;

        println!("✓ Moderator feedback generated:\n{}\n", response);
        Ok(())
    }

    async fn run_round_2(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        println!("\n╔════════════════════════════════════════════════════════════════╗");
        println!("║          ROUND 2: PARALLEL REVISIONS WITH BOUNDS              ║");
        println!("╚════════════════════════════════════════════════════════════════╝\n");

        println!("🔄 Launching three workers in PARALLEL for revisions...\n");

        // Create council for parallel Round 2 execution
        let mut council = Council::new("workers-r2", "Worker Revision Panel")
            .with_mode(CouncilMode::Parallel)
            .with_max_tokens(4096);

        council.add_agent(self.worker_a.clone())?;
        council.add_agent(self.worker_b.clone())?;
        council.add_agent(self.worker_c.clone())?;

        let prompt = r#"
Round 2: Refine your Round 1 estimates with uncertainty bounds.

1. Read feedback/r1 from Memory to see moderator comments
2. Read your r1/* entries and refine them with bounds
3. Provide:
   - Central value (your best estimate)
   - Low bound (conservative)
   - High bound (optimistic)
   - Confidence score (0-1)

WORKER A: Store r2/source.hashrate.v2 and r2/source.energy_per_ths.v2 with bounds
WORKER B: Store r2/energy.kwh_per_day.v2 with low/mid/high calculations using Calculator
WORKER C: Store r2/source.co2_factor.v2 and r2/emissions.tons_per_day.v2 with ranges

ALL: Use ISO-8601 timestamps, include sources and confidence scores.

Execute in parallel. Store results using Memory tool with .v2 suffix.
"#;

        let response = council.discuss(prompt, 1).await?;

        println!("✓ Round 2 complete. All workers revised estimates in parallel:");
        for msg in &response.messages {
            if let Some(agent_id) = &msg.agent_id {
                println!(
                    "\n  📌 {}: {}",
                    agent_id,
                    &msg.content[..100.min(msg.content.len())]
                );
            }
        }

        println!("\n✓ All refined estimates stored in Memory under r2/*.v2 namespace.\n");
        Ok(())
    }

    async fn run_moderator_finalization(
        &self,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        println!("\n╔════════════════════════════════════════════════════════════════╗");
        println!("║          MODERATOR ROUND 2: FINALIZATION & SYNTHESIS          ║");
        println!("╚════════════════════════════════════════════════════════════════╝\n");

        let task_final = r#"
Final task: Synthesize all Round 2 findings and create integrated report.

1. Read all r2/*.v2 entries from Memory (refined Round 2 data)
2. Assemble final/report with:
   - Central values: hashrate_ehs, energy_per_ths_j, energy_kwh_per_day, co2_factor_kg_per_kwh, emissions_tons_per_day
   - Ranges: energy_kwh_per_day {low, high}, emissions_tons_per_day {low, high}
   - Assumptions list (5 key assumptions about this analysis)
   - Sources: array of {metric, url}
   - Confidence: weighted average (0-1)
   - Timestamp: ISO-8601

3. Create final/summary - concise one-liner:
   "Estimated ≈XXXk tCO₂/day (±YY% range), using H=XXX EH/s, η=XX J/TH, EF=X.XX kgCO₂/kWh; confidence ZZ%."

4. Create meta/current - canonical version pointers:
   {"hashrate": "r2/source.hashrate.v2", "energy_per_ths": "r2/source.energy_per_ths.v2", ...}

Store all three in Memory (no TTL for final outputs).
"#;

        let response = self
            .moderator
            .generate(
                "You have Memory access. Create final integrated report from Round 2 data.",
                task_final,
                &[],
            )
            .await?;

        println!("✓ Moderator finalized:\n{}\n", response);
        Ok(())
    }

    async fn dump_memory_state(&self) {
        println!("\n╔════════════════════════════════════════════════════════════════╗");
        println!("║              MEMORY STATE (All Stored Keys & Values)           ║");
        println!("╚════════════════════════════════════════════════════════════════╝\n");

        let keys = self.memory.list_keys();
        println!("Total keys stored: {}\n", keys.len());

        for key in keys {
            if let Some((value, _metadata)) = self.memory.get(&key, false) {
                println!("📌 {}", key);
                // Truncate very long values
                let display_value = if value.len() > 400 {
                    format!("{}...", &value[..400])
                } else {
                    value
                };
                println!("   {}\n", display_value);
            }
        }
    }
}

#[tokio::main]
async fn main() {
    cloudllm::init_logger();

    let api_key_grok =
        std::env::var("XAI_API_KEY").unwrap_or_else(|_| "xai-placeholder".to_string());
    let api_key_openai =
        std::env::var("OPENAI_API_KEY").unwrap_or_else(|_| "sk-placeholder".to_string());

    println!("\n╔════════════════════════════════════════════════════════════════╗");
    println!("║    FOUR-AGENT PANEL WITH MODERATOR & PARALLEL EXECUTION      ║");
    println!("║     Estimating Global CO₂ from Bitcoin Mining (tons/day)      ║");
    println!("╚════════════════════════════════════════════════════════════════╝");

    println!("\n📋 PANEL CONFIGURATION:");
    println!("   Workers (Grok-based, RUN IN PARALLEL):");
    println!("   ├─ Worker A (Data Collector): Researches hashrate & efficiency");
    println!("   ├─ Worker B (Energy Analyst): Calculates kWh/day");
    println!("   └─ Worker C (Emissions Analyst): Estimates CO₂/day");
    println!("   ");
    println!("   Moderator (OpenAI GPT-4.1): Validates, provides feedback, synthesizes output");
    println!("   ");
    println!("   Shared Tools: Memory (KV store), Calculator (math operations)");
    println!("   ");
    println!("   Execution: Council.Parallel for workers, single agent for moderator");
    println!("   Each round: workers run simultaneously, then moderator reviews.");

    match PanelWorkflow::new(&api_key_grok, &api_key_openai).await {
        Ok(panel) => {
            // Execute two-round workflow
            if let Err(e) = panel.run_round_1().await {
                eprintln!("Error in Round 1: {}", e);
                return;
            }

            if let Err(e) = panel.run_moderator_review_round_1().await {
                eprintln!("Error in Moderator Round 1 review: {}", e);
                return;
            }

            if let Err(e) = panel.run_round_2().await {
                eprintln!("Error in Round 2: {}", e);
                return;
            }

            if let Err(e) = panel.run_moderator_finalization().await {
                eprintln!("Error in Moderator finalization: {}", e);
                return;
            }

            // Dump all memory state
            panel.dump_memory_state().await;

            println!("\n✅ WORKFLOW COMPLETE");
            println!("All outputs stored in Memory KV store under namespaces:");
            println!("   r1/* — Round 1 independent analyses");
            println!("   r2/* — Round 2 refined estimates with bounds (.v2 suffix)");
            println!("   final/* — Final report and summary");
            println!("   meta/* — Canonical version pointers");
            println!("   feedback/* — Moderator feedback");
        }
        Err(e) => {
            eprintln!("Failed to initialize panel workflow: {}", e);
        }
    }
}