json_mcp_server/json_tools/
streaming.rs

1use crate::mcp::protocol::{Tool, ToolCall, ToolResult};
2use crate::mcp::server::ToolHandler;
3use async_trait::async_trait;
4use serde_json::{json, Value};
5use std::collections::HashMap;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8
9pub struct JsonStreaming;
10
11impl JsonStreaming {
12    pub fn new() -> Self {
13        Self
14    }
15
16    fn create_stream_read_tool() -> Tool {
17        Tool {
18            name: "json-read".to_string(),
19            description: "Read and query JSON files efficiently. Supports files of any size through automatic streaming, with optional JSONPath filtering for data extraction.".to_string(),
20            input_schema: json!({
21                "type": "object",
22                "properties": {
23                    "file_path": {
24                        "type": "string",
25                        "description": "Path to the large JSON file to stream"
26                    },
27                    "query": {
28                        "type": "string",
29                        "description": "Optional JSONPath expression to filter data during streaming"
30                    },
31                    "limit": {
32                        "type": "integer",
33                        "description": "Maximum number of results to return (default: 1000)",
34                        "default": 1000,
35                        "minimum": 1,
36                        "maximum": 10000
37                    },
38                    "offset": {
39                        "type": "integer", 
40                        "description": "Number of results to skip (default: 0)",
41                        "default": 0,
42                        "minimum": 0
43                    }
44                },
45                "required": ["file_path"]
46            })
47        }
48    }
49
50    async fn handle_stream_read(&self, args: &HashMap<String, Value>) -> anyhow::Result<ToolResult> {
51        let file_path = args.get("file_path")
52            .and_then(|v| v.as_str())
53            .ok_or_else(|| anyhow::anyhow!(
54                "file_path is required. Usage example:\n{{\n  \"file_path\": \"./data.json\"\n}}\nOptional parameters: query, limit, offset"
55            ))?;
56
57        let query = args.get("query").and_then(|v| v.as_str());
58        let limit = args.get("limit")
59            .and_then(|v| v.as_u64())
60            .unwrap_or(1000) as usize;
61        let offset = args.get("offset")
62            .and_then(|v| v.as_u64())
63            .unwrap_or(0) as usize;
64
65        // Try to stream the file
66        let results = self.stream_json_file(file_path, query, limit, offset)?;
67
68        let output = serde_json::to_string_pretty(&results)?;
69
70        Ok(ToolResult::success(format!(
71            "Streamed {} results from '{}' (offset: {}, limit: {}):\n\n{}",
72            match &results {
73                Value::Array(arr) => arr.len(),
74                _ => 1,
75            },
76            file_path,
77            offset,
78            limit,
79            output
80        )))
81    }
82
83    fn stream_json_file(
84        &self,
85        file_path: &str,
86        query: Option<&str>,
87        limit: usize,
88        offset: usize,
89    ) -> anyhow::Result<Value> {
90        let file = File::open(file_path)
91            .map_err(|e| anyhow::anyhow!("Failed to open file '{}': {}", file_path, e))?;
92
93        let reader = BufReader::new(file);
94        let mut results = Vec::new();
95        let mut current_offset = 0;
96        let mut found_results = 0;
97
98        // Try to detect if this is a line-delimited JSON file
99        let mut lines = reader.lines();
100        let mut is_line_delimited = false;
101
102        // Read first few lines to detect format
103        let mut first_lines = Vec::new();
104        for _ in 0..5 {
105            if let Some(Ok(line)) = lines.next() {
106                let line_clone = line.clone();
107                first_lines.push(line);
108                if line_clone.trim().starts_with('{') && line_clone.trim().ends_with('}') {
109                    if serde_json::from_str::<Value>(&line_clone).is_ok() {
110                        is_line_delimited = true;
111                        break;
112                    }
113                }
114            } else {
115                break;
116            }
117        }
118
119        if is_line_delimited {
120            // Process line-delimited JSON
121            let file = File::open(file_path)?;
122            let reader = BufReader::new(file);
123            
124            for line in reader.lines() {
125                let line = line?;
126                if line.trim().is_empty() {
127                    continue;
128                }
129
130                if current_offset < offset {
131                    current_offset += 1;
132                    continue;
133                }
134
135                if found_results >= limit {
136                    break;
137                }
138
139                if let Ok(json_value) = serde_json::from_str::<Value>(&line) {
140                    let should_include = if let Some(query_str) = query {
141                        // Apply JSONPath query to individual line
142                        match jsonpath_rust::JsonPathFinder::from_str(&line, query_str) {
143                            Ok(finder) => {
144                                let result = finder.find();
145                                match result {
146                                    Value::Null => false,
147                                    Value::Array(ref arr) if arr.is_empty() => false,
148                                    _ => true,
149                                }
150                            },
151                            Err(_) => false,
152                        }
153                    } else {
154                        true
155                    };
156
157                    if should_include {
158                        results.push(json_value);
159                        found_results += 1;
160                    }
161                }
162                current_offset += 1;
163            }
164        } else {
165            // Try to parse as regular JSON file and stream through it
166            let content = std::fs::read_to_string(file_path)?;
167            let json_value: Value = serde_json::from_str(&content)?;
168
169            // If it's an array, we can stream through elements
170            if let Value::Array(arr) = json_value {
171                for (_index, item) in arr.iter().enumerate() {
172                    if current_offset < offset {
173                        current_offset += 1;
174                        continue;
175                    }
176
177                    if found_results >= limit {
178                        break;
179                    }
180
181                    let should_include = if let Some(query_str) = query {
182                        let item_str = serde_json::to_string(item)?;
183                        match jsonpath_rust::JsonPathFinder::from_str(&item_str, query_str) {
184                            Ok(finder) => {
185                                let result = finder.find();
186                                match result {
187                                    Value::Null => false,
188                                    Value::Array(ref arr) if arr.is_empty() => false,
189                                    _ => true,
190                                }
191                            },
192                            Err(_) => false,
193                        }
194                    } else {
195                        true
196                    };
197
198                    if should_include {
199                        results.push(item.clone());
200                        found_results += 1;
201                    }
202                    current_offset += 1;
203                }
204            } else {
205                // Single object - apply query if provided
206                let should_include = if let Some(query_str) = query {
207                    match jsonpath_rust::JsonPathFinder::from_str(&content, query_str) {
208                        Ok(finder) => {
209                            let result = finder.find();
210                            match result {
211                                Value::Null => false,
212                                Value::Array(ref arr) if arr.is_empty() => false,
213                                _ => true,
214                            }
215                        },
216                        Err(_) => false,
217                    }
218                } else {
219                    true
220                };
221
222                if should_include && current_offset >= offset && found_results < limit {
223                    results.push(json_value);
224                }
225            }
226        }
227
228        Ok(Value::Array(results))
229    }
230}
231
232#[async_trait]
233impl ToolHandler for JsonStreaming {
234    async fn get_tools(&self) -> anyhow::Result<Vec<Tool>> {
235        Ok(vec![Self::create_stream_read_tool()])
236    }
237
238    async fn call_tool(&self, tool_call: ToolCall) -> anyhow::Result<ToolResult> {
239        match tool_call.name.as_str() {
240            "json-read" => self.handle_stream_read(&tool_call.arguments).await,
241            _ => Ok(ToolResult::error(format!("Unknown tool: {}", tool_call.name))),
242        }
243    }
244}