langgraph-prebuilt 0.2.4

Prebuilt agents, layouts and tools for LangGraph applications in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
//! Helper functions for building graph nodes with minimal boilerplate.
//!
//! These utilities eliminate the manual JSON ↔ typed conversion that makes
//! Rust examples verbose compared to Python's langchain-core.

use std::io::Write;
use serde_json::Value as JsonValue;
use tokio_stream::StreamExt;
use langgraph_checkpoint::config::RunnableConfig;
use langgraph::config::get_stream_writer;
use langgraph::runnable::RunnableError;
use langgraph::stream::StreamPart;
use langgraph::types::StreamMode;

use crate::traits::BaseChatModel;
use crate::types::Message;

/// Extract typed messages from a graph state JSON, with an optional system prompt prepended.
///
/// This replaces the common 8-line pattern:
/// ```ignore
/// let messages_json = input.get("messages")
///     .and_then(|m| m.as_array()).cloned().unwrap_or_default();
/// let mut typed_messages = vec![Message::system("...")];
/// for msg in &messages_json {
///     if let Ok(m) = serde_json::from_value::<Message>(msg.clone()) {
///         typed_messages.push(m);
///     }
/// }
/// ```
///
/// With:
/// ```ignore
/// let messages = extract_messages(&input, Some("You are a helpful assistant."));
/// ```
pub fn extract_messages(input: &JsonValue, system_prompt: Option<&str>) -> Vec<Message> {
    let messages_json = input
        .get("messages")
        .and_then(|m| m.as_array())
        .cloned()
        .unwrap_or_default();

    let mut messages = Vec::with_capacity(messages_json.len() + 1);

    if let Some(prompt) = system_prompt {
        messages.push(Message::system(prompt));
    }

    for msg in &messages_json {
        if let Ok(m) = serde_json::from_value::<Message>(msg.clone()) {
            messages.push(m);
        }
    }

    messages
}

/// Convert a model response into a state update JSON.
///
/// Wraps the response message in `{"messages": [response]}` format.
pub fn llm_response_to_json(response: Message) -> Result<JsonValue, RunnableError> {
    let response_json = serde_json::to_value(response)
        .map_err(|e| RunnableError::Node(e.to_string()))?;
    Ok(serde_json::json!({ "messages": [response_json] }))
}

/// Invoke an LLM and return a state update.
///
/// This is the complete LLM node logic in one call:
/// 1. Extracts messages from input state
/// 2. Prepends system prompt
/// 3. Calls the model
/// 4. Wraps response in state update format
///
/// # Example
/// ```ignore
/// let model_clone = model.clone();
/// graph.add_node("chatbot", move |input: JsonValue, _config: RunnableConfig| {
///     let model = model_clone.clone();
///     async move { invoke_llm(model.as_ref(), &input, "You are a helpful assistant.") }
/// })?;
/// ```
pub fn invoke_llm(
    model: &dyn BaseChatModel,
    input: &JsonValue,
    system_prompt: &str,
) -> Result<JsonValue, RunnableError> {
    let messages = extract_messages(input, Some(system_prompt));
    let response = model.invoke(&messages, &RunnableConfig::new())
        .map_err(|e| RunnableError::Node(e.to_string()))?;
    llm_response_to_json(response)
}

/// Invoke an LLM with a custom config and return a state update.
///
/// Same as [`invoke_llm`] but allows passing a custom config (e.g., for streaming).
pub fn invoke_llm_with_config(
    model: &dyn BaseChatModel,
    input: &JsonValue,
    system_prompt: &str,
    config: &RunnableConfig,
) -> Result<JsonValue, RunnableError> {
    let messages = extract_messages(input, Some(system_prompt));
    let response = model.invoke(messages.as_slice(), config)
        .map_err(|e| RunnableError::Node(e.to_string()))?;
    llm_response_to_json(response)
}

/// Stream LLM tokens via StreamWriter and return the final state update.
///
/// Calls `model.astream()` for token-by-token streaming. Each partial message
/// is forwarded through the stream writer (if active) as a JSON payload:
/// ```json
/// {"type": "token", "content": "Hello"}
/// ```
///
/// The final complete message is returned as a state update in
/// `{"messages": [response]}` format.
///
/// # Example
/// ```ignore
/// let model_clone = model.clone();
/// graph.add_node("chatbot", move |input: JsonValue, _config: RunnableConfig| {
///     let model = model_clone.clone();
///     async move { stream_llm(model.as_ref(), &input, "You are a helpful assistant.").await }
/// })?;
/// ```
pub async fn stream_llm(
    model: &(dyn BaseChatModel + Send + Sync),
    input: &JsonValue,
    system_prompt: &str,
) -> Result<JsonValue, RunnableError> {
    let messages = extract_messages(input, Some(system_prompt));
    let writer = get_stream_writer();

    let config = RunnableConfig::new();
    let mut stream = model.astream(&messages, &config);
    let mut accumulated_thinking = String::new();
    let mut accumulated_content = String::new();
    let mut tool_calls_message = None;

    // Standard incremental streaming (same as LangChain / OpenAI SDK):
    //   - Each chunk yielded by the provider contains ONLY new delta tokens.
    //   - If tool calls are present, the provider yields ONE final signal chunk
    //     at the very end with has_tool_calls()=true and empty content/thinking.
    //     This lets us detect tool calls without re-printing any content.
    //   - We forward every content/thinking delta directly to the stream writer,
    //     and accumulate them ourselves for the final return value.
    while let Some(result) = stream.next().await {
        let chunk = result.map_err(|e| RunnableError::Node(e.to_string()))?;

        if chunk.has_tool_calls() {
            // Tool-calls signal chunk — no content to print, just capture it.
            tool_calls_message = Some(chunk);
        } else {
            // Pure delta chunk — forward to stream writer and accumulate.
            if let Some(ref w) = writer {
                if let Some(thinking) = chunk.thinking() {
                    if !thinking.is_empty() {
                        let _ = w.try_send(serde_json::json!({
                            "type": "thinking",
                            "content": thinking,
                        }));
                    }
                }
                if let Some(content) = chunk.text() {
                    if !content.is_empty() {
                        let _ = w.try_send(serde_json::json!({
                            "type": "token",
                            "content": content,
                        }));
                    }
                }
            }
            if let Some(thinking) = chunk.thinking() {
                accumulated_thinking.push_str(thinking);
            }
            if let Some(content) = chunk.text() {
                accumulated_content.push_str(content);
            }
        }
    }

    // Build the final Message from accumulated content + tool calls (if any).
    let mut final_message = match tool_calls_message {
        Some(tc_msg) => {
            // Reconstruct with full accumulated content + the assembled tool calls.
            let tool_calls = match tc_msg {
                Message::Ai { tool_calls, .. } => tool_calls,
                _ => vec![],
            };
            Message::ai_with_tool_calls(accumulated_content, tool_calls)
        }
        None => Message::ai(accumulated_content),
    };

    if !accumulated_thinking.is_empty() {
        if let Message::Ai { thinking: ref mut th, .. } = final_message {
            *th = Some(accumulated_thinking);
        }
    }

    llm_response_to_json(final_message)
}

/// Get a field from state as i64, defaulting to 0.
pub fn get_i64(input: &JsonValue, key: &str) -> i64 {
    input.get(key).and_then(|v| v.as_i64()).unwrap_or(0)
}

/// Get a field from state as a string, defaulting to "".
pub fn get_str<'a>(input: &'a JsonValue, key: &str) -> &'a str {
    input.get(key).and_then(|v| v.as_str()).unwrap_or("")
}

/// Extract the assistant's text reply from an `invoke_llm` / `stream_llm` result.
///
/// Both helpers return `{"messages": [response]}`. This function digs out the
/// `content` field of the last message so callers don't repeat the same
/// `.get("messages") … .last() … .get("content")` chain every time.
///
/// # Example
/// ```ignore
/// let result = stream_llm(model, &input, "You are a planner.").await?;
/// let text = response_text(&result);
/// println!("LLM said: {}", text);
/// ```
pub fn response_text(result: &JsonValue) -> &str {
    result
        .get("messages")
        .and_then(|m| m.as_array())
        .and_then(|msgs| msgs.last())
        .and_then(|m| m.get("content"))
        .and_then(|c| c.as_str())
        .unwrap_or("")
}

/// Print the last AI message from an `invoke` / `ainvoke` result.
///
/// Mirrors [`print_stream`] for non-streaming scenarios. Finds the last
/// AI message in the `{"messages": [...]}` state, prints its thinking (if any)
/// in dim gray followed by the content in normal color.
///
/// # Example
/// ```ignore
/// let result = agent.ainvoke(&input, &RunnableConfig::new()).await?;
/// print_result(&result);
/// ```
pub fn print_result(result: &JsonValue) {
    print_result_with_options(result, true);
}

/// Like [`print_result`] but with explicit control over thinking display.
///
/// When `show_thinking` is `false` the thinking block is omitted, matching the
/// behaviour of [`print_stream_with_options`] with `show_thinking = false`.
pub fn print_result_with_options(result: &JsonValue, show_thinking: bool) {
    let messages = match result.get("messages").and_then(|m| m.as_array()) {
        Some(m) => m,
        None => return,
    };

    // Walk backwards to find the last AI message that has non-empty content.
    for msg in messages.iter().rev() {
        if msg.get("type").and_then(|t| t.as_str()) != Some("ai") {
            continue;
        }

        // Print thinking in dim gray (same ANSI codes as stream_and_print).
        if show_thinking {
            if let Some(thinking) = msg.get("thinking").and_then(|t| t.as_str()) {
                if !thinking.is_empty() {
                    println!("\x1b[2;90m[Thinking] {}\x1b[0m", thinking);
                }
            }
        }

        // Print the answer content.
        if let Some(content) = msg.get("content").and_then(|c| c.as_str()) {
            if !content.is_empty() {
                println!("{}", content);
                return;
            }
        }

        // Fallback: mention tool calls if the last AI turn was a tool-call step.
        if let Some(tool_calls) = msg.get("tool_calls").and_then(|tc| tc.as_array()) {
            if !tool_calls.is_empty() {
                println!("[Called {} tool(s)]", tool_calls.len());
                return;
            }
        }
    }
}

/// Strip markdown code fences (` ```json … ``` `) and parse the inner JSON.
///
/// If the text is plain JSON (no fences), it is parsed directly.
/// Returns `None` when the text is not valid JSON after stripping.
///
/// # Example
/// ```ignore
/// let text = r#"```json\n{"title": "Plan"}\n```"#;
/// let value = parse_json_response(text).unwrap();
/// assert_eq!(value["title"], "Plan");
/// ```
pub fn parse_json_response(text: &str) -> Option<JsonValue> {
    let trimmed = text.trim();
    let json_str = if trimmed.starts_with("```") {
        let start = trimmed.find('\n').map(|i| i + 1).unwrap_or(3);
        let end = trimmed.rfind("```").unwrap_or(trimmed.len());
        &trimmed[start..end]
    } else {
        trimmed
    };
    serde_json::from_str(json_str.trim()).ok()
}

/// Ask the LLM a single prompt and get back a parsed JSON value.
///
/// This combines three steps that are repeated in every "structured output" node:
/// 1. Call `stream_llm` with a raw prompt (no state extraction)
/// 2. Extract the response text
/// 3. Parse JSON (stripping markdown fences if present)
///
/// Returns `None` when the response is not valid JSON.
///
/// # Example
/// ```ignore
/// let plan = ask_json(model, "Create a plan in JSON format", "").await;
/// ```
pub async fn ask_json(
    model: &(dyn BaseChatModel + Send + Sync),
    prompt: &str,
    system_prompt: &str,
) -> Result<Option<JsonValue>, RunnableError> {
    let input = serde_json::json!({"messages": [{"type": "human", "content": prompt}]});
    let result = stream_llm(model, &input, system_prompt).await?;
    let text = response_text(&result);
    Ok(parse_json_response(text))
}

/// Stream graph execution and print tokens to stdout in real-time.
///
/// Handles the common streaming boilerplate in examples. Tokens from
/// `StreamMode::Custom` are printed inline (typewriter style). Node
/// completion updates from `StreamMode::Updates` are printed as `[update]` lines.
/// Thinking content is printed in dim gray with a `[Thinking]` prefix.
///
/// Returns the collected token text.
///
/// # Example
/// ```ignore
/// use langgraph::prelude::*;
/// use langgraph_prebuilt::print_stream;
///
/// let mut stream = app.astream(&input, &RunnableConfig::new(), vec![StreamMode::Custom, StreamMode::Updates]);
/// let text = print_stream(&mut stream, true).await;
/// println!("Final: {}", text);
/// ```
pub async fn print_stream(
    stream: &mut (impl StreamExt<Item = StreamPart> + Unpin),
    print_updates: bool,
) -> String {
    print_stream_with_options(stream, print_updates, true).await
}

/// Like [`print_stream`] but with explicit control over thinking display.
///
/// When `show_thinking` is `false`, thinking/reasoning content is suppressed.
pub async fn print_stream_with_options(
    stream: &mut (impl StreamExt<Item = StreamPart> + Unpin),
    print_updates: bool,
    show_thinking: bool,
) -> String {
    let mut collected = String::new();
    let mut in_thinking = false;

    while let Some(part) = stream.next().await {
        match part.mode {
            StreamMode::Custom => {
                if let Some(token_type) = part.data.get("type").and_then(|t| t.as_str()) {
                    match token_type {
                        "thinking" if show_thinking => {
                            if let Some(content) = part.data.get("content").and_then(|c| c.as_str()) {
                                if !in_thinking {
                                    // ANSI dark gray: ESC[2;90m — dim + bright black
                                    // Resets at the end of each thinking block.
                                    print!("\x1b[2;90m[Thinking] ");
                                    in_thinking = true;
                                }
                                print!("{}", content);
                                let _ = std::io::stdout().flush();
                            }
                        }
                        "token" => {
                            if in_thinking {
                                // End of thinking block — reset color, then new line before answer
                                print!("\x1b[0m");
                                println!();
                                in_thinking = false;
                            }
                            if let Some(content) = part.data.get("content").and_then(|c| c.as_str()) {
                                print!("{}", content);
                                let _ = std::io::stdout().flush();
                                collected.push_str(content);
                            }
                        }
                        _ => {}
                    }
                }
            }
            StreamMode::Updates if print_updates => {
                if in_thinking {
                    print!("\x1b[0m");
                    println!();
                    in_thinking = false;
                }
                if let Some(obj) = part.data.as_object() {
                    for (node_name, _) in obj {
                        println!("\n[update] Node '{}' completed", node_name);
                    }
                }
            }
            _ => {}
        }
    }

    if in_thinking {
        print!("\x1b[0m");
        println!();
    }

    collected
}