json_mcp_server/json_tools/
streaming.rs1use crate::mcp::protocol::{Tool, ToolCall, ToolResult};
2use crate::mcp::server::ToolHandler;
3use async_trait::async_trait;
4use serde_json::{json, Value};
5use std::collections::HashMap;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8
9pub struct JsonStreaming;
10
11impl JsonStreaming {
12 pub fn new() -> Self {
13 Self
14 }
15
16 fn create_stream_read_tool() -> Tool {
17 Tool {
18 name: "json-read".to_string(),
19 description: "Read and query JSON files efficiently. Supports files of any size through automatic streaming, with optional JSONPath filtering for data extraction.".to_string(),
20 input_schema: json!({
21 "type": "object",
22 "properties": {
23 "file_path": {
24 "type": "string",
25 "description": "Path to the large JSON file to stream"
26 },
27 "query": {
28 "type": "string",
29 "description": "Optional JSONPath expression to filter data during streaming"
30 },
31 "limit": {
32 "type": "integer",
33 "description": "Maximum number of results to return (default: 1000)",
34 "default": 1000,
35 "minimum": 1,
36 "maximum": 10000
37 },
38 "offset": {
39 "type": "integer",
40 "description": "Number of results to skip (default: 0)",
41 "default": 0,
42 "minimum": 0
43 }
44 },
45 "required": ["file_path"]
46 })
47 }
48 }
49
50 async fn handle_stream_read(&self, args: &HashMap<String, Value>) -> anyhow::Result<ToolResult> {
51 let file_path = args.get("file_path")
52 .and_then(|v| v.as_str())
53 .ok_or_else(|| anyhow::anyhow!(
54 "file_path is required. Usage example:\n{{\n \"file_path\": \"./data.json\"\n}}\nOptional parameters: query, limit, offset"
55 ))?;
56
57 let query = args.get("query").and_then(|v| v.as_str());
58 let limit = args.get("limit")
59 .and_then(|v| v.as_u64())
60 .unwrap_or(1000) as usize;
61 let offset = args.get("offset")
62 .and_then(|v| v.as_u64())
63 .unwrap_or(0) as usize;
64
65 let results = self.stream_json_file(file_path, query, limit, offset)?;
67
68 let output = serde_json::to_string_pretty(&results)?;
69
70 Ok(ToolResult::success(format!(
71 "Streamed {} results from '{}' (offset: {}, limit: {}):\n\n{}",
72 match &results {
73 Value::Array(arr) => arr.len(),
74 _ => 1,
75 },
76 file_path,
77 offset,
78 limit,
79 output
80 )))
81 }
82
83 fn stream_json_file(
84 &self,
85 file_path: &str,
86 query: Option<&str>,
87 limit: usize,
88 offset: usize,
89 ) -> anyhow::Result<Value> {
90 let file = File::open(file_path)
91 .map_err(|e| anyhow::anyhow!("Failed to open file '{}': {}", file_path, e))?;
92
93 let reader = BufReader::new(file);
94 let mut results = Vec::new();
95 let mut current_offset = 0;
96 let mut found_results = 0;
97
98 let mut lines = reader.lines();
100 let mut is_line_delimited = false;
101
102 let mut first_lines = Vec::new();
104 for _ in 0..5 {
105 if let Some(Ok(line)) = lines.next() {
106 let line_clone = line.clone();
107 first_lines.push(line);
108 if line_clone.trim().starts_with('{') && line_clone.trim().ends_with('}') {
109 if serde_json::from_str::<Value>(&line_clone).is_ok() {
110 is_line_delimited = true;
111 break;
112 }
113 }
114 } else {
115 break;
116 }
117 }
118
119 if is_line_delimited {
120 let file = File::open(file_path)?;
122 let reader = BufReader::new(file);
123
124 for line in reader.lines() {
125 let line = line?;
126 if line.trim().is_empty() {
127 continue;
128 }
129
130 if current_offset < offset {
131 current_offset += 1;
132 continue;
133 }
134
135 if found_results >= limit {
136 break;
137 }
138
139 if let Ok(json_value) = serde_json::from_str::<Value>(&line) {
140 let should_include = if let Some(query_str) = query {
141 match jsonpath_rust::JsonPathFinder::from_str(&line, query_str) {
143 Ok(finder) => {
144 let result = finder.find();
145 match result {
146 Value::Null => false,
147 Value::Array(ref arr) if arr.is_empty() => false,
148 _ => true,
149 }
150 },
151 Err(_) => false,
152 }
153 } else {
154 true
155 };
156
157 if should_include {
158 results.push(json_value);
159 found_results += 1;
160 }
161 }
162 current_offset += 1;
163 }
164 } else {
165 let content = std::fs::read_to_string(file_path)?;
167 let json_value: Value = serde_json::from_str(&content)?;
168
169 if let Value::Array(arr) = json_value {
171 for (_index, item) in arr.iter().enumerate() {
172 if current_offset < offset {
173 current_offset += 1;
174 continue;
175 }
176
177 if found_results >= limit {
178 break;
179 }
180
181 let should_include = if let Some(query_str) = query {
182 let item_str = serde_json::to_string(item)?;
183 match jsonpath_rust::JsonPathFinder::from_str(&item_str, query_str) {
184 Ok(finder) => {
185 let result = finder.find();
186 match result {
187 Value::Null => false,
188 Value::Array(ref arr) if arr.is_empty() => false,
189 _ => true,
190 }
191 },
192 Err(_) => false,
193 }
194 } else {
195 true
196 };
197
198 if should_include {
199 results.push(item.clone());
200 found_results += 1;
201 }
202 current_offset += 1;
203 }
204 } else {
205 let should_include = if let Some(query_str) = query {
207 match jsonpath_rust::JsonPathFinder::from_str(&content, query_str) {
208 Ok(finder) => {
209 let result = finder.find();
210 match result {
211 Value::Null => false,
212 Value::Array(ref arr) if arr.is_empty() => false,
213 _ => true,
214 }
215 },
216 Err(_) => false,
217 }
218 } else {
219 true
220 };
221
222 if should_include && current_offset >= offset && found_results < limit {
223 results.push(json_value);
224 }
225 }
226 }
227
228 Ok(Value::Array(results))
229 }
230}
231
232#[async_trait]
233impl ToolHandler for JsonStreaming {
234 async fn get_tools(&self) -> anyhow::Result<Vec<Tool>> {
235 Ok(vec![Self::create_stream_read_tool()])
236 }
237
238 async fn call_tool(&self, tool_call: ToolCall) -> anyhow::Result<ToolResult> {
239 match tool_call.name.as_str() {
240 "json-read" => self.handle_stream_read(&tool_call.arguments).await,
241 _ => Ok(ToolResult::error(format!("Unknown tool: {}", tool_call.name))),
242 }
243 }
244}