use anyhow::{anyhow, Result};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::mpsc::UnboundedSender;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::oneshot;
use crate::core::graph::{ClassDef, WorkflowGraph};
use crate::core::instance_arena::{InstanceArena, InstanceId, MethodScope, TypedSlot};
use crate::core::jvalue::JValue;
#[cfg(not(target_arch = "wasm32"))]
type PendingToolStarts = Arc<Mutex<HashMap<String, (String, HashMap<String, String>, Instant)>>>;
#[cfg(target_arch = "wasm32")]
type PendingToolStarts = Arc<Mutex<HashMap<String, (String, HashMap<String, String>)>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolResultPayload {
pub tool_call_id: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolTraceEntry {
pub node_id: String,
pub tool: String,
pub params: HashMap<String, String>,
pub result: Option<Value>,
pub duration: Duration,
pub status: TraceStatus,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum TraceStatus {
Success,
Error(String),
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolStartEvent {
pub node_id: String,
pub tool: String,
pub params: Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolCompleteEvent {
pub node_id: String,
pub tool: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct NodeStartEvent {
pub node_id: String,
pub tool: String,
pub params: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct NodeCompleteEvent {
pub node_id: String,
pub tool: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
pub enum WorkflowEvent {
Token(String),
Status(String),
Error(String),
Meta(Value),
#[cfg(not(target_arch = "wasm32"))]
ToolCall {
call_id: String,
tools: Vec<Value>,
result_tx: oneshot::Sender<(Vec<ToolResultPayload>, Option<Vec<Value>>)>,
},
ToolStart(ToolStartEvent),
ToolComplete(ToolCompleteEvent),
NodeStart(NodeStartEvent),
NodeComplete(NodeCompleteEvent),
Yield(Value),
}
#[derive(Debug, Clone)]
pub struct WorkflowContext {
data: Arc<RwLock<Value>>,
#[cfg(not(target_arch = "wasm32"))]
event_sender: Option<UnboundedSender<WorkflowEvent>>,
execution_stack: Arc<Mutex<Vec<String>>>,
max_depth: usize,
current_workflow: Arc<RwLock<Option<Arc<WorkflowGraph>>>>,
root_workflow: Arc<RwLock<Option<Arc<WorkflowGraph>>>>,
tool_event_level: Arc<AtomicU8>,
stream_node_events: Arc<AtomicBool>,
tool_trace: Arc<Mutex<Vec<ToolTraceEntry>>>,
pending_tool_starts: PendingToolStarts,
class_registry: Arc<RwLock<HashMap<String, Arc<ClassDef>>>>,
instance_arena: InstanceArena,
method_scopes: Arc<RwLock<Vec<MethodScope>>>,
typed_store: Arc<RwLock<HashMap<String, TypedSlot>>>,
}
impl Default for WorkflowContext {
fn default() -> Self {
Self::new()
}
}
impl WorkflowContext {
pub fn new() -> Self {
Self {
data: Arc::new(RwLock::new(json!({}))),
#[cfg(not(target_arch = "wasm32"))]
event_sender: None,
execution_stack: Arc::new(Mutex::new(Vec::new())),
max_depth: 10,
current_workflow: Arc::new(RwLock::new(None)),
root_workflow: Arc::new(RwLock::new(None)),
tool_event_level: Arc::new(AtomicU8::new(0)),
stream_node_events: Arc::new(AtomicBool::new(false)),
tool_trace: Arc::new(Mutex::new(Vec::new())),
pending_tool_starts: Arc::new(Mutex::new(HashMap::new())),
class_registry: Arc::new(RwLock::new(HashMap::new())),
instance_arena: InstanceArena::new(),
method_scopes: Arc::new(RwLock::new(Vec::new())),
typed_store: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn _set_max_depth(&mut self, max_depth: usize) {
self.max_depth = max_depth;
}
#[cfg(not(target_arch = "wasm32"))]
pub fn with_sender(sender: UnboundedSender<WorkflowEvent>) -> Self {
Self {
data: Arc::new(RwLock::new(json!({}))),
event_sender: Some(sender),
execution_stack: Arc::new(Mutex::new(Vec::new())),
max_depth: 10,
current_workflow: Arc::new(RwLock::new(None)),
root_workflow: Arc::new(RwLock::new(None)),
tool_event_level: Arc::new(AtomicU8::new(0)),
stream_node_events: Arc::new(AtomicBool::new(false)),
tool_trace: Arc::new(Mutex::new(Vec::new())),
pending_tool_starts: Arc::new(Mutex::new(HashMap::new())),
class_registry: Arc::new(RwLock::new(HashMap::new())),
instance_arena: InstanceArena::new(),
method_scopes: Arc::new(RwLock::new(Vec::new())),
typed_store: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn fork(&self) -> Self {
Self {
data: Arc::new(RwLock::new(self.data.read().clone())),
#[cfg(not(target_arch = "wasm32"))]
event_sender: self.event_sender.clone(),
execution_stack: self.execution_stack.clone(),
max_depth: self.max_depth,
current_workflow: self.current_workflow.clone(),
root_workflow: self.root_workflow.clone(),
tool_event_level: self.tool_event_level.clone(),
stream_node_events: self.stream_node_events.clone(),
tool_trace: self.tool_trace.clone(),
pending_tool_starts: self.pending_tool_starts.clone(),
class_registry: self.class_registry.clone(),
instance_arena: self.instance_arena.clone(),
method_scopes: Arc::new(RwLock::new(Vec::new())),
typed_store: Arc::new(RwLock::new(self.typed_store.read().clone())),
}
}
pub fn enter_execution(&self, identifier: String) -> Result<()> {
let mut stack = self.execution_stack.lock();
if stack.len() >= self.max_depth {
return Err(anyhow!(
"Maximum execution depth ({}) exceeded. Current stack: {:?}",
self.max_depth,
stack
));
}
if stack.contains(&identifier) {
return Err(anyhow!(
"Circular execution detected: '{}' is already in the call stack: {:?}",
identifier,
stack
));
}
stack.push(identifier);
Ok(())
}
pub fn exit_execution(&self) -> Result<()> {
let mut stack = self.execution_stack.lock();
if stack.is_empty() {
return Err(anyhow!("Execution stack is already empty"));
}
stack.pop();
Ok(())
}
pub fn _get_execution_stack(&self) -> Result<Vec<String>> {
let stack = self.execution_stack.lock();
Ok(stack.clone())
}
pub fn _get_execution_depth(&self) -> Result<usize> {
let stack = self.execution_stack.lock();
Ok(stack.len())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn emit(&self, event: WorkflowEvent) {
if let Some(sender) = &self.event_sender {
let _ = sender.send(event);
}
}
#[cfg(target_arch = "wasm32")]
pub fn emit(&self, _event: WorkflowEvent) {}
#[cfg(not(target_arch = "wasm32"))]
pub fn has_event_sender(&self) -> bool {
self.event_sender.is_some()
}
#[cfg(target_arch = "wasm32")]
pub fn has_event_sender(&self) -> bool {
false
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn emit_tool_call_and_wait(
&self,
call_id: String,
tools: Vec<Value>,
timeout_secs: u64,
) -> Result<(Vec<ToolResultPayload>, Option<Vec<Value>>)> {
let (result_tx, result_rx) = oneshot::channel();
self.emit(WorkflowEvent::ToolCall {
call_id: call_id.clone(),
tools,
result_tx,
});
tokio::time::timeout(Duration::from_secs(timeout_secs), result_rx)
.await
.map_err(|_| {
anyhow!(
"Client tool execution timed out after {}s (call_id: {})",
timeout_secs,
call_id
)
})?
.map_err(|_| anyhow!("Client tool result channel dropped (call_id: {})", call_id))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn get_token_sender_adapter(&self) -> Option<UnboundedSender<String>> {
let event_sender = self.event_sender.clone()?;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
tokio::spawn(async move {
while let Some(token) = rx.recv().await {
let _ = event_sender.send(WorkflowEvent::Token(token));
}
});
Some(tx)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn get_meta_sender_adapter(&self) -> Option<UnboundedSender<Value>> {
let event_sender = self.event_sender.clone()?;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Value>();
tokio::spawn(async move {
while let Some(meta) = rx.recv().await {
let _ = event_sender.send(WorkflowEvent::Meta(meta));
}
});
Some(tx)
}
pub fn set_class_registry(&self, classes: &HashMap<String, Arc<ClassDef>>) {
*self.class_registry.write() = classes.clone();
}
pub fn arena(&self) -> &InstanceArena {
&self.instance_arena
}
pub fn alloc_instance(
&self,
path: String,
class_name: String,
class_def: Arc<ClassDef>,
fields: Vec<Value>,
) -> Result<InstanceId> {
let id = self
.instance_arena
.alloc(path.clone(), class_name, class_def, fields);
self.set(path, json!({"__arena_ref__": id.0}))?;
Ok(id)
}
pub fn lookup_instance(&self, name: &str) -> Result<InstanceId> {
if let Some(id) = self.instance_arena.lookup_by_name(name) {
return Ok(id);
}
let data = self.data.read();
if let Some(val) = data.get(name) {
if let Some(id) = Self::resolve_arena_ref(val) {
return Ok(id);
}
}
Err(anyhow!("Instance '{}' not found in arena", name))
}
pub fn push_method_scope(&self, scope: MethodScope) -> Result<()> {
self.method_scopes.write().push(scope);
Ok(())
}
pub fn pop_method_scope(&self) -> Result<Option<MethodScope>> {
Ok(self.method_scopes.write().pop())
}
pub fn set_method_param_values(&self, values: Vec<TypedSlot>) -> Result<()> {
let mut scopes = self.method_scopes.write();
if let Some(scope) = scopes.last_mut() {
scope.param_values = values;
}
Ok(())
}
pub fn with_method_scope<T>(&self, f: impl FnOnce(&MethodScope) -> T) -> Option<T> {
let scopes = self.method_scopes.read();
scopes.last().map(f)
}
pub fn flush_dirty_to_arena(&self, scope: &MethodScope) {
if scope.dirty.is_empty() {
return;
}
let updates: Vec<(usize, Value)> = scope
.dirty
.iter()
.filter_map(|(name, val)| {
scope
.class_def
.field_index
.get(name.as_str())
.map(|&idx| (idx, val.clone()))
})
.collect();
self.instance_arena
.set_fields_batch(scope.instance_id, &updates);
}
fn resolve_arena_ref(val: &Value) -> Option<InstanceId> {
val.as_object()
.and_then(|m| m.get("__arena_ref__"))
.and_then(|v| v.as_u64())
.map(InstanceId)
}
pub fn set_current_workflow(&self, workflow: Arc<WorkflowGraph>) {
*self.current_workflow.write() = Some(workflow);
}
pub fn get_current_workflow(&self) -> Option<Arc<WorkflowGraph>> {
self.current_workflow.read().clone()
}
pub fn set_root_workflow(&self, workflow: Arc<WorkflowGraph>) {
let mut w = self.root_workflow.write();
if w.is_none() {
*w = Some(workflow);
}
}
pub fn get_root_workflow(&self) -> Option<Arc<WorkflowGraph>> {
self.root_workflow.read().clone()
}
pub fn set_tool_event_level(&self, level: u8) {
self.tool_event_level.store(level, Ordering::Relaxed);
}
pub fn set_stream_node_events(&self, enabled: bool) {
self.stream_node_events.store(enabled, Ordering::Relaxed);
}
pub fn emit_tool_start(&self, node_id: &str, tool: &str, params_json: &str) {
let params_map: HashMap<String, String> =
serde_json::from_str(params_json).unwrap_or_default();
#[cfg(not(target_arch = "wasm32"))]
self.pending_tool_starts.lock().insert(
node_id.to_string(),
(tool.to_string(), params_map, Instant::now()),
);
#[cfg(target_arch = "wasm32")]
self.pending_tool_starts
.lock()
.insert(node_id.to_string(), (tool.to_string(), params_map));
let level = self.tool_event_level.load(Ordering::Relaxed);
if level == 0 {
return;
}
let event_params = if level >= 2 {
serde_json::from_str::<Value>(params_json).unwrap_or(json!({}))
} else {
json!({})
};
self.emit(WorkflowEvent::ToolStart(ToolStartEvent {
node_id: node_id.to_string(),
tool: tool.to_string(),
params: event_params,
}));
}
pub fn emit_tool_complete(&self, node_id: &str, tool: &str, result: &Result<Option<Value>>) {
let start_info = self.pending_tool_starts.lock().remove(node_id);
#[cfg(not(target_arch = "wasm32"))]
let (params, duration) = match start_info {
Some((_, params, started)) => (params, started.elapsed()),
None => (HashMap::new(), Duration::ZERO),
};
#[cfg(target_arch = "wasm32")]
let (params, duration) = match start_info {
Some((_, params)) => (params, Duration::ZERO),
None => (HashMap::new(), Duration::ZERO),
};
let (trace_result, trace_status) = match result {
Ok(val) => (val.clone(), TraceStatus::Success),
Err(e) => (None, TraceStatus::Error(e.to_string())),
};
let entry = ToolTraceEntry {
node_id: node_id.to_string(),
tool: tool.to_string(),
params,
result: trace_result,
duration,
status: trace_status,
};
self.tool_trace.lock().push(entry);
let level = self.tool_event_level.load(Ordering::Relaxed);
if level == 0 {
return;
}
let (status, evt_result, evt_error) = match result {
Ok(val) => (
"success".to_string(),
if level >= 2 { val.clone() } else { None },
None,
),
Err(e) => (
"error".to_string(),
None,
if level >= 2 {
Some(e.to_string())
} else {
None
},
),
};
self.emit(WorkflowEvent::ToolComplete(ToolCompleteEvent {
node_id: node_id.to_string(),
tool: tool.to_string(),
status,
result: evt_result,
error: evt_error,
}));
}
pub fn emit_node_start(&self, node_id: &str, tool: &str, params: &HashMap<String, String>) {
#[cfg(not(target_arch = "wasm32"))]
self.pending_tool_starts.lock().insert(
node_id.to_string(),
(tool.to_string(), params.clone(), Instant::now()),
);
#[cfg(target_arch = "wasm32")]
self.pending_tool_starts
.lock()
.insert(node_id.to_string(), (tool.to_string(), params.clone()));
if !self.stream_node_events.load(Ordering::Relaxed) {
return;
}
self.emit(WorkflowEvent::NodeStart(NodeStartEvent {
node_id: node_id.to_string(),
tool: tool.to_string(),
params: params.clone(),
}));
}
pub fn emit_node_complete(&self, node_id: &str, tool: &str, result: &Result<Option<Value>>) {
let start_info = self.pending_tool_starts.lock().remove(node_id);
#[cfg(not(target_arch = "wasm32"))]
let (params, duration) = match start_info {
Some((_, params, started)) => (params, started.elapsed()),
None => (HashMap::new(), Duration::ZERO),
};
#[cfg(target_arch = "wasm32")]
let (params, duration) = match start_info {
Some((_, params)) => (params, Duration::ZERO),
None => (HashMap::new(), Duration::ZERO),
};
let (trace_result, trace_status) = match result {
Ok(val) => (val.clone(), TraceStatus::Success),
Err(e) => (None, TraceStatus::Error(e.to_string())),
};
let entry = ToolTraceEntry {
node_id: node_id.to_string(),
tool: tool.to_string(),
params,
result: trace_result,
duration,
status: trace_status,
};
self.tool_trace.lock().push(entry);
if !self.stream_node_events.load(Ordering::Relaxed) {
return;
}
match result {
Ok(val) => self.emit(WorkflowEvent::NodeComplete(NodeCompleteEvent {
node_id: node_id.to_string(),
tool: tool.to_string(),
status: "success".to_string(),
result: val.clone(),
error: None,
})),
Err(e) => self.emit(WorkflowEvent::NodeComplete(NodeCompleteEvent {
node_id: node_id.to_string(),
tool: tool.to_string(),
status: "error".to_string(),
result: None,
error: Some(e.to_string()),
})),
}
}
pub fn trace_entries(&self) -> Vec<ToolTraceEntry> {
self.tool_trace.lock().clone()
}
#[allow(dead_code)]
pub fn trace_tool_called(&self, tool_name: &str) -> Vec<ToolTraceEntry> {
self.trace_entries()
.into_iter()
.filter(|e| e.tool == tool_name)
.collect()
}
#[allow(dead_code)]
pub fn trace_total_duration(&self) -> Duration {
self.trace_entries().iter().map(|e| e.duration).sum()
}
pub fn _clear_trace(&self) {
self.tool_trace.lock().clear();
self.pending_tool_starts.lock().clear();
}
pub fn set(&self, path: String, value: Value) -> Result<()> {
if path == "reply.status" {
if let Some(s) = value.as_str() {
self.emit(WorkflowEvent::Status(s.to_string()));
}
}
if !path.contains('.') {
let mut scopes = self.method_scopes.write();
if let Some(scope) = scopes.last_mut() {
if let Some(&idx) = scope.class_def.field_index.get(&path) {
if idx < scope.field_cache.len() {
scope.field_cache[idx] = TypedSlot::from_value(value.clone());
}
scope.dirty.insert(path, value);
return Ok(());
}
scope.dirty.insert(path, value);
return Ok(());
}
}
match &value {
Value::Number(_) | Value::Bool(_) | Value::String(_) | Value::Null => {
self.typed_store
.write()
.insert(path.clone(), TypedSlot::from_value(value.clone()));
}
_ => {
self.typed_store.write().remove(&path);
}
}
let mut data = self.data.write();
let parts: Vec<&str> = path.split('.').collect();
let (last_key, parent_parts) = parts
.split_last()
.ok_or_else(|| anyhow!("Cannot set a value with an empty path"))?;
let mut current = &mut *data;
for part in parent_parts {
current = current
.as_object_mut()
.ok_or_else(|| anyhow!(format!("Path part '{}' is not an object", part)))?
.entry(part.to_string())
.or_insert_with(|| json!({}));
}
if let Some(obj) = current.as_object_mut() {
obj.insert(last_key.to_string(), value);
} else {
return Err(anyhow!("Final path segment is not an object"));
}
Ok(())
}
#[allow(dead_code)]
pub fn set_typed(&self, path: String, slot: TypedSlot) -> Result<()> {
let value = slot.to_value();
if !path.contains('.') {
let mut scopes = self.method_scopes.write();
if let Some(scope) = scopes.last_mut() {
if let Some(&idx) = scope.class_def.field_index.get(&path) {
if idx < scope.field_cache.len() {
scope.field_cache[idx] = slot;
}
scope.dirty.insert(path, value);
return Ok(());
}
scope.dirty.insert(path, value);
return Ok(());
}
}
self.typed_store.write().insert(path.clone(), slot);
let mut data = self.data.write();
let parts: Vec<&str> = path.split('.').collect();
let (last_key, parent_parts) = parts
.split_last()
.ok_or_else(|| anyhow!("Cannot set a value with an empty path"))?;
let mut current = &mut *data;
for part in parent_parts {
current = current
.as_object_mut()
.ok_or_else(|| anyhow!(format!("Path part '{}' is not an object", part)))?
.entry(part.to_string())
.or_insert_with(|| json!({}));
}
if let Some(obj) = current.as_object_mut() {
obj.insert(last_key.to_string(), value);
} else {
return Err(anyhow!("Final path segment is not an object"));
}
Ok(())
}
pub fn resolve_path(&self, path: &str) -> Result<Option<Value>> {
{
let scopes = self.method_scopes.read();
if let Some(scope) = scopes.last() {
if let Some(result) = self.resolve_in_method_scope(scope, path) {
return Ok(Some(result));
}
}
}
let data = self.data.read();
if !path.contains('.') {
let val = data.get(path);
if let Some(v) = val {
if let Some(id) = Self::resolve_arena_ref(v) {
drop(data);
return Ok(self.instance_arena.materialize(id));
}
}
return Ok(val.cloned());
}
let registry_guard = self.class_registry.read();
let registry = Some(&*registry_guard);
let mut segments = path.splitn(2, '.');
let first = segments.next().unwrap();
let rest = segments.next().unwrap_or("");
let root = match data.get(first) {
Some(v) => v,
None => return Ok(None),
};
if let Some(id) = Self::resolve_arena_ref(root) {
drop(data);
return Ok(self.resolve_arena_field(id, rest));
}
if !rest.contains('.') {
return Ok(Some(resolve_field(root, rest, registry)));
}
let mut current = root.clone();
for part in rest.split('.') {
current = resolve_field(¤t, part, registry);
if current.is_null() {
return Ok(None);
}
}
Ok(Some(current))
}
pub fn resolve_path_typed(&self, path: &str) -> Option<TypedSlot> {
{
let scopes = self.method_scopes.read();
if let Some(scope) = scopes.last() {
if !path.contains('.') {
if let Some(val) = scope.dirty.get(path) {
return Some(TypedSlot::from_value(val.clone()));
}
if let Some(&idx) = scope.class_def.field_index.get(path) {
return scope.field_cache.get(idx).cloned();
}
} else {
let mut segments = path.splitn(2, '.');
let first = segments.next().unwrap();
let rest = segments.next().unwrap_or("");
if first == "self" && !rest.contains('.') && !rest.is_empty() {
if let Some(val) = scope.dirty.get(rest) {
return Some(TypedSlot::from_value(val.clone()));
}
if let Some(&idx) = scope.class_def.field_index.get(rest) {
return scope.field_cache.get(idx).cloned();
}
}
}
}
}
self.typed_store.read().get(path).cloned()
}
fn resolve_in_method_scope(&self, scope: &MethodScope, path: &str) -> Option<Value> {
if !path.contains('.') {
if path == "self" {
return self
.instance_arena
.materialize_with_dirty(scope.instance_id, &scope.dirty);
}
if let Some(val) = scope.dirty.get(path) {
return Some(val.clone());
}
if let Some(&idx) = scope.class_def.field_index.get(path) {
return scope.field_cache.get(idx).map(|s| s.to_value());
}
return None;
}
let mut segments = path.splitn(2, '.');
let first = segments.next().unwrap();
let rest = segments.next().unwrap_or("");
if first == "self" {
return self.resolve_self_field(scope, rest);
}
let root_val = if let Some(val) = scope.dirty.get(first) {
Some(val.clone())
} else if let Some(&idx) = scope.class_def.field_index.get(first) {
scope.field_cache.get(idx).map(|s| s.to_value())
} else {
None
};
if let Some(root) = root_val {
let registry_guard = self.class_registry.read();
let registry = Some(&*registry_guard);
if !rest.contains('.') {
return Some(resolve_field(&root, rest, registry));
}
let mut current = root;
for part in rest.split('.') {
current = resolve_field(¤t, part, registry);
if current.is_null() {
return None;
}
}
return Some(current);
}
None }
fn resolve_self_field(&self, scope: &MethodScope, rest: &str) -> Option<Value> {
if rest.is_empty() {
return self
.instance_arena
.materialize_with_dirty(scope.instance_id, &scope.dirty);
}
let mut segments = rest.splitn(2, '.');
let field_name = segments.next().unwrap();
let further = segments.next();
let field_val = if let Some(val) = scope.dirty.get(field_name) {
val.clone()
} else if let Some(&idx) = scope.class_def.field_index.get(field_name) {
scope
.field_cache
.get(idx)
.map(|s| s.to_value())
.unwrap_or(Value::Null)
} else {
return None;
};
if further.is_none() {
return Some(field_val);
}
let further = further.unwrap();
let registry_guard = self.class_registry.read();
let registry = Some(&*registry_guard);
if !further.contains('.') {
return Some(resolve_field(&field_val, further, registry));
}
let mut current = field_val;
for part in further.split('.') {
current = resolve_field(¤t, part, registry);
if current.is_null() {
return None;
}
}
Some(current)
}
fn resolve_arena_field(&self, id: InstanceId, rest: &str) -> Option<Value> {
if rest.is_empty() {
return self.instance_arena.materialize(id);
}
let class_def = self.instance_arena.class_def(id)?;
let mut segments = rest.splitn(2, '.');
let field_name = segments.next().unwrap();
let further = segments.next();
let field_val = if let Some(&idx) = class_def.field_index.get(field_name) {
self.instance_arena.get_field(id, idx)?
} else if field_name == "__class__" {
Value::String(self.instance_arena.class_name(id)?)
} else {
return None;
};
if further.is_none() {
return Some(field_val);
}
let further = further.unwrap();
let registry_guard = self.class_registry.read();
let registry = Some(&*registry_guard);
if !further.contains('.') {
return Some(resolve_field(&field_val, further, registry));
}
let mut current = field_val;
for part in further.split('.') {
current = resolve_field(¤t, part, registry);
if current.is_null() {
return None;
}
}
Some(current)
}
pub fn get_as_value(&self) -> Result<Value> {
Ok(self.data.read().clone())
}
pub fn get_jvalue(&self, path: &str) -> JValue {
JValue::from(self.resolve_path(path).ok().flatten())
}
pub fn get_str(&self, path: &str) -> Option<String> {
self.get_jvalue(path).string()
}
#[allow(dead_code)]
pub fn get_i64(&self, path: &str) -> Option<i64> {
self.get_jvalue(path).i64()
}
#[allow(dead_code)]
pub fn get_f64(&self, path: &str) -> Option<f64> {
self.get_jvalue(path).f64()
}
}
fn resolve_field(
val: &Value,
field: &str,
class_registry: Option<&HashMap<String, Arc<ClassDef>>>,
) -> Value {
match val {
Value::Object(map) => {
if map.contains_key("__arena_ref__") && !map.contains_key("__class__") {
return Value::Null;
}
if let (Some(Value::String(class_name)), Some(fields_arr)) =
(map.get("__class__"), map.get("__fields__"))
{
if let Some(arr) = fields_arr.as_array() {
if let Some(registry) = class_registry {
if let Some(class_def) = registry.get(class_name.as_str()) {
if let Some(&idx) = class_def.field_index.get(field) {
return arr.get(idx).cloned().unwrap_or(Value::Null);
}
}
}
if let Some(index_map) = map.get("__field_index__") {
if let Some(idx_obj) = index_map.as_object() {
if let Some(idx_val) = idx_obj.get(field) {
if let Some(idx) = idx_val.as_u64() {
return arr.get(idx as usize).cloned().unwrap_or(Value::Null);
}
}
}
}
if field.starts_with("__") {
return map.get(field).cloned().unwrap_or(Value::Null);
}
return Value::Null;
}
}
map.get(field).cloned().unwrap_or(Value::Null)
}
Value::Array(arr) => {
if let Ok(idx) = field.parse::<usize>() {
arr.get(idx).cloned().unwrap_or(Value::Null)
} else {
Value::Null
}
}
Value::String(s) => {
if let Ok(parsed) = serde_json::from_str::<Value>(s) {
resolve_field(&parsed, field, class_registry)
} else {
Value::Null
}
}
_ => Value::Null,
}
}