use {
crate::{
Ident,
database::storage::Partitions,
protocol::{
jsonrpc,
lsp::{
LSPAny,
LanguageServer,
},
},
record::LaburnumRecordRef,
scheduler::task::TaskContext,
},
otel::exception,
serde_json::Value,
};
pub async fn handle_query_records<P: Partitions, T: LanguageServer<P>>(
arguments: &[Value],
ctx: &mut TaskContext<P, T>,
) -> jsonrpc::Result<Option<LSPAny>> {
let arg = arguments.first().ok_or_else(|| {
exception!(
"invalid_params",
jsonrpc::Error::invalid_params("Missing arguments object")
)
})?;
let partition_key: Ident = arg
.get("partitionKey")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.ok_or_else(|| {
exception!(
"invalid_params",
jsonrpc::Error::invalid_params(
"Missing or invalid 'partitionKey' field"
)
)
})?;
let _get_latest_only = match arg.get("latest") {
| None => false,
| Some(v) => {
v.as_bool().ok_or_else(|| {
exception!(
"invalid_params",
jsonrpc::Error::invalid_params(
"Invalid value for 'latest', accepts true or false"
)
)
})?
},
};
let _sort_key_type = if let Some(sort_key_param) = arg.get("sortKey") {
if sort_key_param.is_string() {
"exact"
} else if sort_key_param.is_object() {
"prefix"
} else {
"unknown"
}
} else {
"all"
};
let query_client = ctx.query_client();
let query_results = if let Some(sort_key_param) = arg.get("sortKey") {
if let Some(exact_key) = sort_key_param.as_str() {
query_client
.get_record(partition_key, exact_key.to_string())
.await
} else if let Some(obj) = sort_key_param.as_object() {
if let Some(begins_with) = obj.get("beginsWith").and_then(|v| v.as_str())
{
query_client
.prefix_internal(partition_key, begins_with.to_string())
.await
} else {
return Err(exception!(
"invalid_params",
jsonrpc::Error::invalid_params(
"sortKey object must have 'beginsWith' property"
)
));
}
} else {
return Err(exception!(
"invalid_params",
jsonrpc::Error::invalid_params(
"sortKey must be a string or object with 'beginsWith'"
)
));
}
} else {
query_client
.prefix_internal(partition_key, String::new())
.await
};
let source_cache = ctx.source_cache();
let source_cache_guard = source_cache.read();
let records: Vec<Value> = query_results
.iter_with_metadata()
.filter_map(|(record_meta, record_ref)| {
let record_ref = record_ref?;
let serialized = serde_json::value::Serializer;
let mut record_value = match record_ref
.serialize_with_source_cache(&source_cache_guard, serialized)
{
| Ok(value) => value,
| Err(_e) => {
return None;
},
};
if let Some(obj) = record_value.as_object_mut() {
obj.insert(
"partition_key".to_string(),
serde_json::to_value(record_meta.partition_key)
.unwrap_or(serde_json::Value::Null),
);
obj.insert(
"sort_key".to_string(),
serde_json::json!(record_meta.sort_key),
);
obj.insert(
"content_hash".to_string(),
serde_json::to_value(record_meta.content_hash)
.unwrap_or(serde_json::Value::Null),
);
}
Some(record_value)
})
.collect();
Ok(Some(serde_json::json!(records)))
}