Skip to main content

hematite/tools/
scientific.rs

1use serde_json::Value;
2
3pub async fn scientific_compute(args: &Value) -> Result<String, String> {
4    let mode = args["mode"]
5        .as_str()
6        .ok_or("Missing 'mode' (symbolic, units, complexity, ledger, dataset)")?;
7
8    match mode {
9        "symbolic" => solve_symbolic(args).await,
10        "units" => verify_units(args).await,
11        "complexity" => audit_complexity(args).await,
12        "ledger" => manage_ledger(args).await,
13        "dataset" => calculate_on_dataset(args).await,
14        _ => Err(format!("Unknown scientific mode: {}", mode)),
15    }
16}
17
18async fn solve_symbolic(args: &Value) -> Result<String, String> {
19    let expr = args["expr"]
20        .as_str()
21        .ok_or("Missing 'expr' for symbolic mode")?;
22    let target = args["target"].as_str().unwrap_or("solve"); // solve, simplify, integrate, diff
23    let latex = args["latex"].as_bool().unwrap_or(false);
24
25    let python_script = format!(
26        "import sympy\n\
27         from sympy import symbols, solve, simplify, integrate, diff, Eq, latex\n\
28         # Attempt to find symbols automatically\n\
29         import re\n\
30         raw_expr = r\"{}\"\n\
31         # Extract likely symbols (single letters or words starting with letter)\n\
32         sym_names = set(re.findall(r'\\b[a-zA-Z][a-zA-Z0-9]*\\b', raw_expr))\n\
33         # Remove common functions\n\
34         sym_names -= {{'sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'pi', 'E', 'oo', 'solve', 'simplify', 'integrate', 'diff'}}\n\
35         sym_dict = {{name: symbols(name) for name in sym_names}}\n\
36         \n\
37         try:\n\
38             if \"=\" in raw_expr and \"{}\" == \"solve\":\n\
39                 lhs, rhs = raw_expr.split(\"=\")\n\
40                 result = solve(Eq(eval(lhs, {{'__builtins__': None}}, sym_dict), eval(rhs, {{'__builtins__': None}}, sym_dict)))\n\
41             else:\n\
42                 expr_obj = eval(raw_expr, {{'__builtins__': None}}, sym_dict)\n\
43                 if \"{}\" == \"simplify\": result = simplify(expr_obj)\n\
44                 elif \"{}\" == \"integrate\": result = integrate(expr_obj)\n\
45                 elif \"{}\" == \"diff\": result = diff(expr_obj)\n\
46                 else: result = solve(expr_obj)\n\
47             \n\
48             print(f\"RESULT: {{result}}\")\n\
49             if {}:\n\
50                 print(f\"LATEX: {{latex(result)}}\")\n\
51         except Exception as e:\n\
52             print(f\"ERROR: {{e}}\")\n",
53        expr, target, target, target, target, latex
54    );
55
56    execute_in_sandbox(&python_script).await
57}
58
59async fn verify_units(args: &Value) -> Result<String, String> {
60    let calculation = args["calculation"]
61        .as_str()
62        .ok_or("Missing 'calculation' for units mode")?;
63
64    let python_script = format!(
65        "try:\n\
66         # Simple Unit System (SI focus)\n\
67         class UnitValue:\n\
68             def __init__(self, val, dims):\n\
69                 self.val = val\n\
70                 self.dims = dims # {{'m': 1, 's': -1, etc}}\n\
71             def __add__(self, other):\n\
72                 if self.dims != other.dims: raise ValueError(f\"Dimension mismatch: {{self.dims}} vs {{other.dims}}\")\n\
73                 return UnitValue(self.val + other.val, self.dims)\n\
74             def __mul__(self, other):\n\
75                 new_dims = self.dims.copy()\n\
76                 for k, v in other.dims.items(): new_dims[k] = new_dims.get(k, 0) + v\n\
77                 return UnitValue(self.val * other.val, new_dims)\n\
78             def __truediv__(self, other):\n\
79                 new_dims = self.dims.copy()\n\
80                 for k, v in other.dims.items(): new_dims[k] = new_dims.get(k, 0) - v\n\
81                 return UnitValue(self.val / other.val, new_dims)\n\
82             def __repr__(self): return f\"{{self.val}} ({{self.dims}})\"\n\
83         \n\
84         # Helper to parse strings like '10m'\n\
85         def u(s):\n\
86             m = __import__('re').match(r'([\\d\\.]+)([a-zA-Z]+)', s)\n\
87             val = float(m.group(1))\n\
88             unit = m.group(2)\n\
89             return UnitValue(val, {{unit: 1}})\n\
90         \n\
91         # Executing the calculation with unit objects\n\
92         # User input is expected to use u('10m') etc.\n\
93         raw_calc = r\"{}\"\n\
94         # Basic auto-wrap for units in the expression if they look like 10m\n\
95         wrapped = __import__('re').sub(r'(\\d+)([a-z]+)', r\"u('\\1\\2')\", raw_calc)\n\
96         result = eval(wrapped, {{'u': u}})\n\
97         print(f\"RESULT: {{result}}\")\n\
98         except Exception as e:\n\
99         print(f\"ERROR: {{e}}\")\n",
100        calculation
101    );
102
103    execute_in_sandbox(&python_script).await
104}
105
106async fn audit_complexity(args: &Value) -> Result<String, String> {
107    let snippet = args["snippet"]
108        .as_str()
109        .ok_or("Missing 'snippet' for complexity mode")?;
110
111    let python_script = format!(
112        "import time\n\
113         import math\n\
114         def run_target(n):\n\
115             {}\n\
116         \n\
117         samples = [10, 50, 100, 200, 500]\n\
118         times = []\n\
119         for n in samples:\n\
120             start = time.perf_counter()\n\
121             run_target(n)\n\
122             times.append(time.perf_counter() - start)\n\
123         \n\
124         # Simplified regression to guess Big-O\n\
125         # Compare growth rates: t/n, t/n^2, t/log(n)\n\
126         ratios_n = [t/n for t, n in zip(times, samples) if n > 0]\n\
127         ratios_n2 = [t/(n**2) for t, n in zip(times, samples) if n > 0]\n\
128         \n\
129         def variance(data):\n\
130             if not data: return 1.0\n\
131             avg = sum(data)/len(data)\n\
132             return sum((x-avg)**2 for x in data)/len(data)\n\
133         \n\
134         v_n = variance(ratios_n)\n\
135         v_n2 = variance(ratios_n2)\n\
136         \n\
137         if v_n < v_n2: complexity = \"O(N)\"\n\
138         elif v_n2 < v_n: complexity = \"O(N^2)\"\n\
139         else: complexity = \"O(Unknown)\"\n\
140         \n\
141         print(f\"RESULT: Empirically detected {{complexity}}\")\n\
142         print(f\"STATS: n={{samples}}, times={{[f'{{t:.6f}}s' for t in times]}}\")\n",
143        snippet.replace("\n", "\n    ")
144    );
145
146    execute_in_sandbox(&python_script).await
147}
148
149async fn execute_in_sandbox(script: &str) -> Result<String, String> {
150    let sandbox_args = serde_json::json!({
151        "language": "python",
152        "code": script
153    });
154
155    crate::tools::code_sandbox::execute(&sandbox_args).await
156}
157
158async fn manage_ledger(args: &Value) -> Result<String, String> {
159    let action = args["action"]
160        .as_str()
161        .ok_or("Missing 'action' (read, append)")?;
162    let ledger_path = std::path::Path::new(".hematite/docs/scientific_ledger.md");
163
164    if let Some(parent) = ledger_path.parent() {
165        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
166    }
167
168    match action {
169        "read" => {
170            if !ledger_path.exists() {
171                return Ok("Scientific Ledger is currently empty.".to_string());
172            }
173            std::fs::read_to_string(ledger_path).map_err(|e| e.to_string())
174        }
175        "append" => {
176            let content = args["content"]
177                .as_str()
178                .ok_or("Missing 'content' to append")?;
179            let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
180            let entry = format!(
181                "\n### [{}] Scientific Derivation\n{}\n---\n",
182                timestamp, content
183            );
184
185            use std::io::Write;
186            let mut file = std::fs::OpenOptions::new()
187                .create(true)
188                .append(true)
189                .open(ledger_path)
190                .map_err(|e| e.to_string())?;
191
192            file.write_all(entry.as_bytes())
193                .map_err(|e| e.to_string())?;
194            Ok("Derivation successfully persisted to Scientific Ledger (RAG-indexed).".to_string())
195        }
196        _ => Err(format!("Unknown ledger action: {}", action)),
197    }
198}
199
200async fn calculate_on_dataset(args: &Value) -> Result<String, String> {
201    let path_str = args["path"].as_str().ok_or("Missing 'path' to dataset")?;
202    let sql = args["sql"].as_str().ok_or("Missing 'sql' to fetch data")?;
203    let python_op = args["python_op"]
204        .as_str()
205        .ok_or("Missing 'python_op' (e.g. 'sum(vals)/len(vals)')")?;
206
207    let path = std::path::PathBuf::from(path_str);
208
209    let data = crate::tools::data_query::query_to_json_helper(&path, sql).await?;
210    let data_json = serde_json::to_string(&data).map_err(|e| e.to_string())?;
211
212    let python_script = format!(
213        "import math\n\
214         data = {}\n\
215         vals = []\n\
216         for row in data:\n\
217             for v in row.values():\n\
218                 try: vals.append(float(v))\n\
219                 except: pass\n\
220         \n\
221         try:\n\
222             result = {}\n\
223             print(f\"RESULT: {{result}}\")\n\
224         except Exception as e:\n\
225             print(f\"ERROR: {{e}}\")\n",
226        data_json, python_op
227    );
228
229    execute_in_sandbox(&python_script).await
230}