laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  crate::{
    Ident,
    database::storage::Partitions,
    protocol::{
      jsonrpc,
      lsp::{
        LSPAny,
        LanguageServer,
      },
    },
    record::LaburnumRecordRef,
    scheduler::task::TaskContext,
  },
  otel::exception,
  serde_json::Value,
};

pub async fn handle_query_records<P: Partitions, T: LanguageServer<P>>(
  arguments: &[Value],
  ctx: &mut TaskContext<P, T>,
) -> jsonrpc::Result<Option<LSPAny>> {
  let arg = arguments.first().ok_or_else(|| {
    exception!(
      "invalid_params",
      jsonrpc::Error::invalid_params("Missing arguments object")
    )
  })?;

  let partition_key: Ident = arg
    .get("partitionKey")
    .and_then(|v| serde_json::from_value(v.clone()).ok())
    .ok_or_else(|| {
      exception!(
        "invalid_params",
        jsonrpc::Error::invalid_params(
          "Missing or invalid 'partitionKey' field"
        )
      )
    })?;

  let _get_latest_only = match arg.get("latest") {
    | None => false,
    | Some(v) => {
      v.as_bool().ok_or_else(|| {
        exception!(
          "invalid_params",
          jsonrpc::Error::invalid_params(
            "Invalid value for 'latest', accepts true or false"
          )
        )
      })?
    },
  };

  let _sort_key_type = if let Some(sort_key_param) = arg.get("sortKey") {
    if sort_key_param.is_string() {
      "exact"
    } else if sort_key_param.is_object() {
      "prefix"
    } else {
      "unknown"
    }
  } else {
    "all"
  };

  let query_client = ctx.query_client();

  let query_results = if let Some(sort_key_param) = arg.get("sortKey") {
    if let Some(exact_key) = sort_key_param.as_str() {
      query_client
        .get_record(partition_key, exact_key.to_string())
        .await
    } else if let Some(obj) = sort_key_param.as_object() {
      if let Some(begins_with) = obj.get("beginsWith").and_then(|v| v.as_str())
      {
        query_client
          .prefix_internal(partition_key, begins_with.to_string())
          .await
      } else {
        return Err(exception!(
          "invalid_params",
          jsonrpc::Error::invalid_params(
            "sortKey object must have 'beginsWith' property"
          )
        ));
      }
    } else {
      return Err(exception!(
        "invalid_params",
        jsonrpc::Error::invalid_params(
          "sortKey must be a string or object with 'beginsWith'"
        )
      ));
    }
  } else {
    query_client
      .prefix_internal(partition_key, String::new())
      .await
  };

  let source_cache = ctx.source_cache();
  let source_cache_guard = source_cache.read();

  let records: Vec<Value> = query_results
    .iter_with_metadata()
    .filter_map(|(record_meta, record_ref)| {
      let record_ref = record_ref?;

      let serialized = serde_json::value::Serializer;
      let mut record_value = match record_ref
        .serialize_with_source_cache(&source_cache_guard, serialized)
      {
        | Ok(value) => value,
        | Err(_e) => {
          return None;
        },
      };

      if let Some(obj) = record_value.as_object_mut() {
        obj.insert(
          "partition_key".to_string(),
          serde_json::to_value(record_meta.partition_key)
            .unwrap_or(serde_json::Value::Null),
        );
        obj.insert(
          "sort_key".to_string(),
          serde_json::json!(record_meta.sort_key),
        );

        obj.insert(
          "content_hash".to_string(),
          serde_json::to_value(record_meta.content_hash)
            .unwrap_or(serde_json::Value::Null),
        );
      }

      Some(record_value)
    })
    .collect();

  Ok(Some(serde_json::json!(records)))
}