use std::{collections::HashMap, sync::Arc};
use num_bigint::BigInt;
use reifydb_value::{
error::{Diagnostic, Error},
fragment::Fragment,
params::Params,
util::bitvec::BitVec,
value::{
Value,
blob::Blob,
container::{
any::AnyContainer, blob::BlobContainer, bool::BoolContainer, identity_id::IdentityIdContainer,
number::NumberContainer, temporal::TemporalContainer, utf8::Utf8Container, uuid::UuidContainer,
},
date::Date,
datetime::DateTime,
decimal::Decimal,
duration::Duration,
frame::{column::FrameColumn, data::FrameColumnData, frame::Frame},
identity::IdentityId,
int::Int,
row_number::RowNumber,
temporal::parse::datetime::parse_datetime,
time::Time,
uint::Uint,
uuid::{Uuid4, Uuid7},
value_type::ValueType,
},
};
use reifydb_wire_format::decode::decode_frames;
use serde_json::{Value as JsonValue, from_str as serde_json_from_str};
use tonic::{
Request, Status,
codec::Streaming,
metadata::{Ascii, MetadataMap, MetadataValue},
transport::Channel,
};
use uuid::Uuid;
use super::generated::{
AdminRequest as ProtoAdminRequest, AuthenticateRequest as ProtoAuthenticateRequest,
BatchSubscribeRequest as ProtoBatchSubscribeRequest, BatchSubscriptionEvent,
BatchUnsubscribeRequest as ProtoBatchUnsubscribeRequest, CommandRequest as ProtoCommandRequest, Format,
Frame as ProtoFrame, FramesPayload, LogoutRequest as ProtoLogoutRequest, NamedParams,
OperationRequest as ProtoOperationRequest, Params as ProtoParams, PositionalParams,
QueryRequest as ProtoQueryRequest, SubscribeRequest as ProtoSubscribeRequest, SubscriptionEvent, TypedValue,
UnsubscribeRequest as ProtoUnsubscribeRequest, admin_response, batch_subscription_event, change_event,
command_response, operation_response, params::Params as ProtoParamsOneof, query_response,
reify_db_client::ReifyDbClient, subscription_event,
};
use crate::{
AdminResult, BatchChangeEntry, BatchChangePayload, BatchMemberClosedPayload, BatchMemberInfo, BatchPushEvent,
ChangeKind, ChangePayload, CommandResult, LoginResult, QueryResult, ResponseMeta, WireFormat,
changes::{read_op_kind, strip_op_column},
client::{BatchSubscription as ClientBatchSubscription, ReifyClient, Subscription as ClientSubscription},
subscription::{BatchItem, SubscriptionConfig, build_subscription_rql},
};
fn extract_meta(metadata: &MetadataMap) -> Option<ResponseMeta> {
let fingerprint = metadata.get("x-fingerprint").and_then(|v| v.to_str().ok())?;
let duration = metadata.get("x-duration").and_then(|v| v.to_str().ok())?;
Some(ResponseMeta {
fingerprint: fingerprint.to_string(),
duration: duration.to_string(),
})
}
pub enum RawChangePayload {
Rbcf(Vec<u8>),
Proto(FramesPayload),
Empty,
}
impl RawChangePayload {
pub fn into_frames(self) -> Vec<Frame> {
match self {
Self::Rbcf(bytes) => decode_frames(&bytes).unwrap_or_default(),
Self::Proto(fp) => proto_frames_to_frames(fp.frames),
Self::Empty => Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct GrpcChange {
pub kind: ChangeKind,
pub frames: Vec<Frame>,
}
fn classify_frames(frames: Vec<Frame>) -> GrpcChange {
let kind = frames.first().map(read_op_kind).unwrap_or(ChangeKind::Insert);
let frames = frames.into_iter().map(strip_op_column).collect();
GrpcChange {
kind,
frames,
}
}
#[derive(Clone)]
pub struct GrpcClient {
inner: ReifyDbClient<Channel>,
token: Option<String>,
format: WireFormat,
}
impl GrpcClient {
pub async fn connect(url: &str, format: WireFormat) -> Result<Self, Error> {
if format == WireFormat::Json {
return Err(Error(Box::new(Diagnostic {
code: "INVALID_FORMAT".to_string(),
message: "WireFormat::Json is not supported for GrpcClient".to_string(),
..Default::default()
})));
}
let channel =
Channel::from_shared(url.to_string()).unwrap().tcp_nodelay(true).connect().await.map_err(
|e| {
Error(Box::new(Diagnostic {
code: "GRPC_CONNECT".to_string(),
message: format!("Failed to connect: {}", e),
..Default::default()
}))
},
)?;
Ok(Self {
inner: ReifyDbClient::new(channel),
token: None,
format,
})
}
pub fn authenticate(&mut self, token: &str) {
self.token = Some(token.to_string());
}
pub async fn login_with_password(&mut self, identifier: &str, password: &str) -> Result<LoginResult, Error> {
let mut credentials = HashMap::new();
credentials.insert("identifier".to_string(), identifier.to_string());
credentials.insert("password".to_string(), password.to_string());
self.login("password", credentials).await
}
pub async fn login_with_token(&mut self, token: &str) -> Result<LoginResult, Error> {
let mut credentials = HashMap::new();
credentials.insert("token".to_string(), token.to_string());
self.login("token", credentials).await
}
pub async fn login(
&mut self,
method: &str,
credentials: HashMap<String, String>,
) -> Result<LoginResult, Error> {
let request = ProtoAuthenticateRequest {
method: method.to_string(),
credentials,
};
let mut client = self.inner.clone();
let response = client.authenticate(Request::new(request)).await.map_err(status_to_error)?;
let inner = response.into_inner();
if inner.status == "authenticated" {
self.token = Some(inner.token.clone());
Ok(LoginResult {
token: inner.token,
identity: inner.identity,
})
} else {
Err(Error(Box::new(Diagnostic {
code: "AUTH_FAILED".to_string(),
message: inner.reason,
..Default::default()
})))
}
}
pub async fn logout(&mut self) -> Result<(), Error> {
if self.token.is_none() {
return Ok(());
}
let request = ProtoLogoutRequest {};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
client.logout(req).await.map_err(status_to_error)?;
self.token = None;
Ok(())
}
fn wire_format(&self) -> i32 {
match self.format {
WireFormat::Rbcf => Format::Rbcf as i32,
_ => Format::Proto as i32,
}
}
pub async fn admin(&self, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
Ok(self.admin_with_meta(rql, params).await?.frames)
}
pub async fn admin_with_meta(&self, rql: &str, params: Option<Params>) -> Result<AdminResult, Error> {
let request = ProtoAdminRequest {
rql: rql.to_string(),
params: params.and_then(params_to_proto),
format: self.wire_format(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
let response = client.admin(req).await.map_err(status_to_error)?;
let meta = extract_meta(response.metadata());
let frames = decode_admin_payload(response.into_inner().payload)?;
Ok(AdminResult {
frames,
meta,
})
}
pub async fn command(&self, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
Ok(self.command_with_meta(rql, params).await?.frames)
}
pub async fn command_with_meta(&self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
let request = ProtoCommandRequest {
rql: rql.to_string(),
params: params.and_then(params_to_proto),
format: self.wire_format(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
let response = client.command(req).await.map_err(status_to_error)?;
let meta = extract_meta(response.metadata());
let frames = decode_command_payload(response.into_inner().payload)?;
Ok(CommandResult {
frames,
meta,
})
}
pub async fn query(&self, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
Ok(self.query_with_meta(rql, params).await?.frames)
}
pub async fn query_with_meta(&self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
let request = ProtoQueryRequest {
rql: rql.to_string(),
params: params.and_then(params_to_proto),
format: self.wire_format(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
let response = client.query(req).await.map_err(status_to_error)?;
let meta = extract_meta(response.metadata());
let frames = decode_query_payload(response.into_inner().payload)?;
Ok(QueryResult {
frames,
meta,
})
}
pub async fn call(&self, name: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
Ok(self.call_with_meta(name, params).await?.frames)
}
pub async fn call_with_meta(&self, name: &str, params: Option<Params>) -> Result<CommandResult, Error> {
let request = ProtoOperationRequest {
name: name.to_string(),
params: params.and_then(params_to_proto),
format: self.wire_format(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
let response = client.call(req).await.map_err(status_to_error)?;
let meta = extract_meta(response.metadata());
let frames = decode_operation_payload(response.into_inner().payload)?;
Ok(CommandResult {
frames,
meta,
})
}
pub async fn subscribe(&self, rql: &str, config: SubscriptionConfig) -> Result<GrpcSubscription, Error> {
let request = ProtoSubscribeRequest {
rql: build_subscription_rql(rql, &config),
format: self.wire_format(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
let response = client.subscribe(req).await.map_err(status_to_error)?;
let mut stream = response.into_inner();
let first = stream.message().await.map_err(status_to_error)?.ok_or_else(|| {
Error(Box::new(Diagnostic {
code: "GRPC_SUBSCRIBE".to_string(),
message: "Stream closed before receiving subscription ID".to_string(),
..Default::default()
}))
})?;
let subscription_id = match first.event {
Some(subscription_event::Event::Subscribed(s)) => s.subscription_id,
_ => {
return Err(Error(Box::new(Diagnostic {
code: "GRPC_SUBSCRIBE".to_string(),
message: "Expected SubscribedEvent as first message".to_string(),
..Default::default()
})));
}
};
Ok(GrpcSubscription {
subscription_id,
stream,
format: self.format,
})
}
pub async fn unsubscribe(&self, subscription_id: &str) -> Result<(), Error> {
let request = ProtoUnsubscribeRequest {
subscription_id: subscription_id.to_string(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
client.unsubscribe(req).await.map_err(status_to_error)?;
Ok(())
}
pub async fn batch_subscribe(&self, items: &[BatchItem<'_>]) -> Result<BatchGrpcSubscription, Error> {
let request = ProtoBatchSubscribeRequest {
rql: items.iter().map(|i| build_subscription_rql(i.rql, &i.config)).collect(),
format: self.wire_format(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
let response = client.batch_subscribe(req).await.map_err(status_to_error)?;
let mut stream = response.into_inner();
let first = stream.message().await.map_err(status_to_error)?.ok_or_else(|| {
Error(Box::new(Diagnostic {
code: "GRPC_BATCH_SUBSCRIBE".to_string(),
message: "Stream closed before receiving batch subscribed event".to_string(),
..Default::default()
}))
})?;
let (batch_id, members) = match first.event {
Some(batch_subscription_event::Event::Subscribed(s)) => {
let members: Vec<BatchMemberHandle> = s
.members
.into_iter()
.map(|m| BatchMemberHandle {
index: m.index as usize,
subscription_id: m.subscription_id,
})
.collect();
(s.batch_id, members)
}
_ => {
return Err(Error(Box::new(Diagnostic {
code: "GRPC_BATCH_SUBSCRIBE".to_string(),
message: "Expected BatchSubscribedEvent as first message".to_string(),
..Default::default()
})));
}
};
Ok(BatchGrpcSubscription {
batch_id,
members,
stream,
})
}
pub async fn batch_unsubscribe(&self, batch_id: &str) -> Result<(), Error> {
let request = ProtoBatchUnsubscribeRequest {
batch_id: batch_id.to_string(),
};
let mut client = self.inner.clone();
let mut req = Request::new(request);
self.attach_auth(&mut req);
client.batch_unsubscribe(req).await.map_err(status_to_error)?;
Ok(())
}
fn attach_auth<T>(&self, request: &mut Request<T>) {
if let Some(ref token) = self.token {
let bearer = format!("Bearer {}", token);
if let Ok(value) = bearer.parse::<MetadataValue<Ascii>>() {
request.metadata_mut().insert("authorization", value);
}
}
}
}
pub struct GrpcSubscription {
subscription_id: String,
stream: Streaming<SubscriptionEvent>,
#[allow(dead_code)]
format: WireFormat,
}
#[derive(Debug, Clone)]
pub struct BatchMemberHandle {
pub index: usize,
pub subscription_id: String,
}
pub struct BatchGrpcSubscription {
batch_id: String,
members: Vec<BatchMemberHandle>,
stream: Streaming<BatchSubscriptionEvent>,
}
#[derive(Debug, Clone)]
pub struct BatchFramesEnvelope {
pub batch_id: String,
pub entries: HashMap<String, GrpcChange>,
pub entry_errors: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum BatchStreamEvent {
Change(BatchFramesEnvelope),
MemberClosed {
batch_id: String,
subscription_id: String,
},
}
impl BatchGrpcSubscription {
pub fn batch_id(&self) -> &str {
&self.batch_id
}
pub fn members(&self) -> &[BatchMemberHandle] {
&self.members
}
pub async fn recv(&mut self) -> Option<BatchStreamEvent> {
loop {
let msg = self.stream.message().await.ok()??;
match msg.event {
Some(batch_subscription_event::Event::Change(change)) => {
let batch_id = change.batch_id;
let mut entries: HashMap<String, GrpcChange> = HashMap::new();
let mut entry_errors: HashMap<String, String> = HashMap::new();
for entry in change.entries {
let sub_id = entry.subscription_id;
match entry.change.and_then(|c| c.payload) {
Some(change_event::Payload::Rbcf(bytes)) => {
match decode_frames(&bytes) {
Ok(frames) => {
entries.insert(
sub_id,
classify_frames(frames),
);
}
Err(e) => {
entry_errors.insert(
sub_id.clone(),
e.to_string(),
);
entries.insert(
sub_id,
classify_frames(Vec::new()),
);
}
}
}
Some(change_event::Payload::Frames(fp)) => {
entries.insert(
sub_id,
classify_frames(proto_frames_to_frames(
fp.frames,
)),
);
}
None => {
entries.insert(sub_id, classify_frames(Vec::new()));
}
}
}
return Some(BatchStreamEvent::Change(BatchFramesEnvelope {
batch_id,
entries,
entry_errors,
}));
}
Some(batch_subscription_event::Event::MemberClosed(m)) => {
return Some(BatchStreamEvent::MemberClosed {
batch_id: m.batch_id,
subscription_id: m.subscription_id,
});
}
Some(batch_subscription_event::Event::Subscribed(_)) => {
continue;
}
None => continue,
}
}
}
}
impl GrpcSubscription {
pub fn subscription_id(&self) -> &str {
&self.subscription_id
}
pub async fn recv(&mut self) -> Option<GrpcChange> {
self.recv_raw().await.map(|p| classify_frames(p.into_frames()))
}
pub async fn recv_raw(&mut self) -> Option<RawChangePayload> {
loop {
let msg = self.stream.message().await.ok()??;
match msg.event {
Some(subscription_event::Event::Change(change)) => {
let payload = match change.payload {
Some(change_event::Payload::Rbcf(bytes)) => {
RawChangePayload::Rbcf(bytes)
}
Some(change_event::Payload::Frames(fp)) => RawChangePayload::Proto(fp),
None => RawChangePayload::Empty,
};
return Some(payload);
}
Some(subscription_event::Event::Subscribed(_)) => {
continue;
}
None => continue,
}
}
}
}
fn decode_admin_payload(payload: Option<admin_response::Payload>) -> Result<Vec<Frame>, Error> {
match payload {
Some(admin_response::Payload::Rbcf(bytes)) => decode_rbcf(&bytes),
Some(admin_response::Payload::Frames(fp)) => Ok(proto_frames_to_frames(fp.frames)),
None => Ok(Vec::new()),
}
}
fn decode_command_payload(payload: Option<command_response::Payload>) -> Result<Vec<Frame>, Error> {
match payload {
Some(command_response::Payload::Rbcf(bytes)) => decode_rbcf(&bytes),
Some(command_response::Payload::Frames(fp)) => Ok(proto_frames_to_frames(fp.frames)),
None => Ok(Vec::new()),
}
}
fn decode_query_payload(payload: Option<query_response::Payload>) -> Result<Vec<Frame>, Error> {
match payload {
Some(query_response::Payload::Rbcf(bytes)) => decode_rbcf(&bytes),
Some(query_response::Payload::Frames(fp)) => Ok(proto_frames_to_frames(fp.frames)),
None => Ok(Vec::new()),
}
}
fn decode_operation_payload(payload: Option<operation_response::Payload>) -> Result<Vec<Frame>, Error> {
match payload {
Some(operation_response::Payload::Rbcf(bytes)) => decode_rbcf(&bytes),
Some(operation_response::Payload::Frames(fp)) => Ok(proto_frames_to_frames(fp.frames)),
None => Ok(Vec::new()),
}
}
fn decode_rbcf(bytes: &[u8]) -> Result<Vec<Frame>, Error> {
decode_frames(bytes).map_err(|e| {
Error(Box::new(Diagnostic {
code: "RBCF_DECODE".to_string(),
message: format!("failed to decode RBCF payload: {}", e),
..Default::default()
}))
})
}
fn params_to_proto(params: Params) -> Option<ProtoParams> {
match params {
Params::None => None,
Params::Positional(values) => Some(ProtoParams {
params: Some(ProtoParamsOneof::Positional(PositionalParams {
values: Arc::unwrap_or_clone(values).into_iter().map(value_to_typed_value).collect(),
})),
}),
Params::Named(map) => Some(ProtoParams {
params: Some(ProtoParamsOneof::Named(NamedParams {
values: Arc::unwrap_or_clone(map)
.into_iter()
.map(|(k, v)| (k, value_to_typed_value(v)))
.collect(),
})),
}),
}
}
fn value_to_typed_value(value: Value) -> TypedValue {
let (type_u32, bytes) = match value {
Value::None {
inner,
} => ((0x80 | inner.to_u8()) as u32, vec![]),
Value::Boolean(b) => (ValueType::Boolean.to_u8() as u32, vec![b as u8]),
Value::Float4(f) => (ValueType::Float4.to_u8() as u32, f.to_le_bytes().to_vec()),
Value::Float8(f) => (ValueType::Float8.to_u8() as u32, f.to_le_bytes().to_vec()),
Value::Int1(v) => (ValueType::Int1.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Int2(v) => (ValueType::Int2.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Int4(v) => (ValueType::Int4.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Int8(v) => (ValueType::Int8.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Int16(v) => (ValueType::Int16.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Uint1(v) => (ValueType::Uint1.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Uint2(v) => (ValueType::Uint2.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Uint4(v) => (ValueType::Uint4.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Uint8(v) => (ValueType::Uint8.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Uint16(v) => (ValueType::Uint16.to_u8() as u32, v.to_le_bytes().to_vec()),
Value::Utf8(s) => (ValueType::Utf8.to_u8() as u32, s.into_bytes()),
Value::Uuid4(u) => (ValueType::Uuid4.to_u8() as u32, u.0.as_bytes().to_vec()),
Value::Uuid7(u) => (ValueType::Uuid7.to_u8() as u32, u.0.as_bytes().to_vec()),
Value::Date(d) => (ValueType::Date.to_u8() as u32, d.to_days_since_epoch().to_le_bytes().to_vec()),
Value::DateTime(dt) => (ValueType::DateTime.to_u8() as u32, dt.to_nanos().to_le_bytes().to_vec()),
Value::Time(t) => (ValueType::Time.to_u8() as u32, t.to_nanos_since_midnight().to_le_bytes().to_vec()),
Value::Duration(d) => {
let mut buf = Vec::with_capacity(16);
buf.extend_from_slice(&d.get_months().to_le_bytes());
buf.extend_from_slice(&d.get_days().to_le_bytes());
buf.extend_from_slice(&d.get_nanos().to_le_bytes());
(ValueType::Duration.to_u8() as u32, buf)
}
Value::Blob(b) => (ValueType::Blob.to_u8() as u32, b.as_bytes().to_vec()),
Value::Decimal(d) => (ValueType::Decimal.to_u8() as u32, d.to_string().into_bytes()),
Value::IdentityId(id) => (ValueType::IdentityId.to_u8() as u32, id.0.0.as_bytes().to_vec()),
Value::Int(big) => (ValueType::Int.to_u8() as u32, big.0.to_signed_bytes_le()),
Value::Uint(big) => (ValueType::Uint.to_u8() as u32, big.0.to_signed_bytes_le()),
Value::Any(inner) => return value_to_typed_value(*inner),
Value::DictionaryId(id) => {
(ValueType::DictionaryId.to_u8() as u32, id.to_u128().to_le_bytes().to_vec())
}
Value::Type(t) => (ValueType::Any.to_u8() as u32, vec![t.to_u8()]),
Value::List(items) | Value::Tuple(items) => {
let mut buf = Vec::new();
buf.extend_from_slice(&(items.len() as u32).to_le_bytes());
for item in &items {
let tv = value_to_typed_value(item.clone());
buf.extend_from_slice(&tv.r#type.to_le_bytes());
buf.extend_from_slice(&(tv.value.len() as u32).to_le_bytes());
buf.extend_from_slice(&tv.value);
}
(ValueType::Any.to_u8() as u32, buf)
}
Value::Record(fields) => {
let mut buf = Vec::new();
buf.extend_from_slice(&(fields.len() as u32).to_le_bytes());
for (key, value) in fields {
let key_bytes = key.as_bytes();
buf.extend_from_slice(&(key_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(key_bytes);
let tv = value_to_typed_value(value);
buf.extend_from_slice(&tv.r#type.to_le_bytes());
buf.extend_from_slice(&(tv.value.len() as u32).to_le_bytes());
buf.extend_from_slice(&tv.value);
}
(ValueType::Any.to_u8() as u32, buf)
}
};
TypedValue {
r#type: type_u32,
value: bytes,
}
}
fn proto_frames_to_frames(frames: Vec<ProtoFrame>) -> Vec<Frame> {
frames.into_iter()
.map(|f| {
let row_numbers: Vec<RowNumber> = f.row_numbers.into_iter().map(RowNumber::new).collect();
let columns: Vec<FrameColumn> = f
.columns
.into_iter()
.map(|c| {
let ty = ValueType::from_u8(c.r#type as u8);
let data = decode_column_data(ty, &c.payload, &c.bitvec);
FrameColumn {
name: c.name,
data,
}
})
.collect();
let created_at = f
.created_at
.iter()
.filter_map(|s| parse_datetime(Fragment::internal(s)).ok())
.collect();
let updated_at = f
.updated_at
.iter()
.filter_map(|s| parse_datetime(Fragment::internal(s)).ok())
.collect();
Frame {
row_numbers,
created_at,
updated_at,
columns,
}
})
.collect()
}
fn decode_column_data(ty: ValueType, data: &[u8], bitvec_bytes: &[u8]) -> FrameColumnData {
match ty {
ValueType::Option(inner_type) => {
let bitvec = decode_bitvec(bitvec_bytes);
let inner = decode_column_data(*inner_type, data, &[]);
FrameColumnData::Option {
inner: Box::new(inner),
bitvec,
}
}
ValueType::Boolean => {
let bitvec = decode_bitvec(data);
let values: Vec<bool> = bitvec.iter().collect();
FrameColumnData::Bool(BoolContainer::new(values))
}
ValueType::Float4 => {
let values: Vec<f32> = data
.chunks_exact(4)
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Float4(NumberContainer::new(values))
}
ValueType::Float8 => {
let values: Vec<f64> = data
.chunks_exact(8)
.map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Float8(NumberContainer::new(values))
}
ValueType::Int1 => {
let values: Vec<i8> = data.iter().map(|&b| b as i8).collect();
FrameColumnData::Int1(NumberContainer::new(values))
}
ValueType::Int2 => {
let values: Vec<i16> = data
.chunks_exact(2)
.map(|chunk| i16::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Int2(NumberContainer::new(values))
}
ValueType::Int4 => {
let values: Vec<i32> = data
.chunks_exact(4)
.map(|chunk| i32::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Int4(NumberContainer::new(values))
}
ValueType::Int8 => {
let values: Vec<i64> = data
.chunks_exact(8)
.map(|chunk| i64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Int8(NumberContainer::new(values))
}
ValueType::Int16 => {
let values: Vec<i128> = data
.chunks_exact(16)
.map(|chunk| i128::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Int16(NumberContainer::new(values))
}
ValueType::Uint1 => {
let values: Vec<u8> = data.to_vec();
FrameColumnData::Uint1(NumberContainer::new(values))
}
ValueType::Uint2 => {
let values: Vec<u16> = data
.chunks_exact(2)
.map(|chunk| u16::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Uint2(NumberContainer::new(values))
}
ValueType::Uint4 => {
let values: Vec<u32> = data
.chunks_exact(4)
.map(|chunk| u32::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Uint4(NumberContainer::new(values))
}
ValueType::Uint8 => {
let values: Vec<u64> = data
.chunks_exact(8)
.map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Uint8(NumberContainer::new(values))
}
ValueType::Uint16 => {
let values: Vec<u128> = data
.chunks_exact(16)
.map(|chunk| u128::from_le_bytes(chunk.try_into().unwrap()))
.collect();
FrameColumnData::Uint16(NumberContainer::new(values))
}
ValueType::Utf8 => {
let values = decode_length_prefixed_strings(data);
FrameColumnData::Utf8(Utf8Container::new(values))
}
ValueType::Date => {
let values: Vec<Date> = data
.chunks_exact(4)
.map(|chunk| {
let days = i32::from_le_bytes(chunk.try_into().unwrap());
Date::from_days_since_epoch(days)
.unwrap_or_else(|| Date::from_ymd(1970, 1, 1).unwrap())
})
.collect();
FrameColumnData::Date(TemporalContainer::new(values))
}
ValueType::DateTime => {
let values: Vec<DateTime> = data
.chunks_exact(8)
.map(|chunk| {
let nanos = u64::from_le_bytes(chunk.try_into().unwrap());
DateTime::from_nanos(nanos)
})
.collect();
FrameColumnData::DateTime(TemporalContainer::new(values))
}
ValueType::Time => {
let values: Vec<Time> = data
.chunks_exact(8)
.map(|chunk| {
let nanos = u64::from_le_bytes(chunk.try_into().unwrap());
Time::from_nanos_since_midnight(nanos)
.unwrap_or_else(|| Time::from_hms(0, 0, 0).unwrap())
})
.collect();
FrameColumnData::Time(TemporalContainer::new(values))
}
ValueType::Duration => {
let values: Vec<Duration> = data
.chunks_exact(16)
.map(|chunk| {
let months = i32::from_le_bytes(chunk[..4].try_into().unwrap());
let days = i32::from_le_bytes(chunk[4..8].try_into().unwrap());
let nanos = i64::from_le_bytes(chunk[8..16].try_into().unwrap());
Duration::new(months, days, nanos).unwrap()
})
.collect();
FrameColumnData::Duration(TemporalContainer::new(values))
}
ValueType::IdentityId => {
let values: Vec<IdentityId> = data
.chunks_exact(16)
.map(|chunk| {
let uuid = Uuid::from_bytes(chunk.try_into().unwrap());
IdentityId(Uuid7(uuid))
})
.collect();
FrameColumnData::IdentityId(IdentityIdContainer::new(values))
}
ValueType::Uuid4 => {
let values: Vec<Uuid4> = data
.chunks_exact(16)
.map(|chunk| {
let uuid = Uuid::from_bytes(chunk.try_into().unwrap());
Uuid4(uuid)
})
.collect();
FrameColumnData::Uuid4(UuidContainer::new(values))
}
ValueType::Uuid7 => {
let values: Vec<Uuid7> = data
.chunks_exact(16)
.map(|chunk| {
let uuid = Uuid::from_bytes(chunk.try_into().unwrap());
Uuid7(uuid)
})
.collect();
FrameColumnData::Uuid7(UuidContainer::new(values))
}
ValueType::Blob => {
let mut values = Vec::new();
let mut pos = 0;
while pos + 4 <= data.len() {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let bytes = data[pos..pos + len].to_vec();
pos += len;
values.push(Blob::new(bytes));
}
FrameColumnData::Blob(BlobContainer::new(values))
}
ValueType::Int => {
let mut values = Vec::new();
let mut pos = 0;
while pos + 4 <= data.len() {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let bytes = &data[pos..pos + len];
pos += len;
values.push(Int(BigInt::from_signed_bytes_le(bytes)));
}
FrameColumnData::Int(NumberContainer::new(values))
}
ValueType::Uint => {
let mut values = Vec::new();
let mut pos = 0;
while pos + 4 <= data.len() {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let bytes = &data[pos..pos + len];
pos += len;
values.push(Uint(BigInt::from_signed_bytes_le(bytes)));
}
FrameColumnData::Uint(NumberContainer::new(values))
}
ValueType::Decimal => {
let strings = decode_length_prefixed_strings(data);
let values: Vec<Decimal> = strings
.into_iter()
.map(|s| s.parse::<Decimal>().unwrap_or_else(|_| Decimal::from_i64(0)))
.collect();
FrameColumnData::Decimal(NumberContainer::new(values))
}
ValueType::Any => {
let mut values: Vec<Box<Value>> = Vec::new();
let mut pos = 0;
while pos < data.len() {
let (val, consumed) = decode_any_value(&data[pos..]);
pos += consumed;
values.push(Box::new(val));
}
FrameColumnData::Any(AnyContainer::new(values))
}
ValueType::DictionaryId => {
FrameColumnData::Utf8(Utf8Container::new(vec![]))
}
ValueType::List(_) | ValueType::Record(_) | ValueType::Tuple(_) => {
FrameColumnData::Utf8(Utf8Container::new(vec![]))
}
}
}
fn decode_bitvec(data: &[u8]) -> BitVec {
if data.len() < 4 {
return BitVec::default();
}
let num_bits = u32::from_le_bytes(data[..4].try_into().unwrap()) as usize;
let byte_count = num_bits.div_ceil(8);
let bits = data[4..4 + byte_count].to_vec();
BitVec::from_raw(bits, num_bits)
}
fn decode_length_prefixed_strings(data: &[u8]) -> Vec<String> {
let mut values = Vec::new();
let mut pos = 0;
while pos + 4 <= data.len() {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let s = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
pos += len;
values.push(s);
}
values
}
fn decode_any_value(data: &[u8]) -> (Value, usize) {
let type_tag = data[0];
let ty = ValueType::from_u8(type_tag);
let mut pos = 1;
match ty {
ValueType::Option(inner) => {
(
Value::None {
inner: *inner,
},
pos,
)
}
ValueType::Boolean => {
let v = data[pos] != 0;
(Value::Boolean(v), pos + 1)
}
ValueType::Float4 => {
let v = f32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
(Value::float4(v), pos + 4)
}
ValueType::Float8 => {
let v = f64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
(Value::float8(v), pos + 8)
}
ValueType::Int1 => {
let v = data[pos] as i8;
(Value::Int1(v), pos + 1)
}
ValueType::Int2 => {
let v = i16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
(Value::Int2(v), pos + 2)
}
ValueType::Int4 => {
let v = i32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
(Value::Int4(v), pos + 4)
}
ValueType::Int8 => {
let v = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
(Value::Int8(v), pos + 8)
}
ValueType::Int16 => {
let v = i128::from_le_bytes(data[pos..pos + 16].try_into().unwrap());
(Value::Int16(v), pos + 16)
}
ValueType::Uint1 => {
let v = data[pos];
(Value::Uint1(v), pos + 1)
}
ValueType::Uint2 => {
let v = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
(Value::Uint2(v), pos + 2)
}
ValueType::Uint4 => {
let v = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
(Value::Uint4(v), pos + 4)
}
ValueType::Uint8 => {
let v = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
(Value::Uint8(v), pos + 8)
}
ValueType::Uint16 => {
let v = u128::from_le_bytes(data[pos..pos + 16].try_into().unwrap());
(Value::Uint16(v), pos + 16)
}
ValueType::Utf8 => {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let s = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
(Value::Utf8(s), pos + len)
}
ValueType::Date => {
let days = i32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
let d = Date::from_days_since_epoch(days)
.unwrap_or_else(|| Date::from_ymd(1970, 1, 1).unwrap());
(Value::Date(d), pos + 4)
}
ValueType::DateTime => {
let nanos = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
let dt = DateTime::from_nanos(nanos);
(Value::DateTime(dt), pos + 8)
}
ValueType::Time => {
let nanos = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
let t = Time::from_nanos_since_midnight(nanos)
.unwrap_or_else(|| Time::from_hms(0, 0, 0).unwrap());
(Value::Time(t), pos + 8)
}
ValueType::Duration => {
let months = i32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
let days = i32::from_le_bytes(data[pos + 4..pos + 8].try_into().unwrap());
let nanos = i64::from_le_bytes(data[pos + 8..pos + 16].try_into().unwrap());
(Value::Duration(Duration::new(months, days, nanos).unwrap()), pos + 16)
}
ValueType::IdentityId => {
let uuid = Uuid::from_bytes(data[pos..pos + 16].try_into().unwrap());
(Value::IdentityId(IdentityId(Uuid7(uuid))), pos + 16)
}
ValueType::Uuid4 => {
let uuid = Uuid::from_bytes(data[pos..pos + 16].try_into().unwrap());
(Value::Uuid4(Uuid4(uuid)), pos + 16)
}
ValueType::Uuid7 => {
let uuid = Uuid::from_bytes(data[pos..pos + 16].try_into().unwrap());
(Value::Uuid7(Uuid7(uuid)), pos + 16)
}
ValueType::Blob => {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let bytes = data[pos..pos + len].to_vec();
(Value::Blob(Blob::new(bytes)), pos + len)
}
ValueType::Int => {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let bytes = &data[pos..pos + len];
(Value::Int(Int(BigInt::from_signed_bytes_le(bytes))), pos + len)
}
ValueType::Uint => {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let bytes = &data[pos..pos + len];
(Value::Uint(Uint(BigInt::from_signed_bytes_le(bytes))), pos + len)
}
ValueType::Decimal => {
let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let s = String::from_utf8_lossy(&data[pos..pos + len]).into_owned();
let d = s.parse::<Decimal>().unwrap_or_else(|_| Decimal::from_i64(0));
(Value::Decimal(d), pos + len)
}
ValueType::Any => {
let (inner_val, consumed) = decode_any_value(&data[pos..]);
(Value::Any(Box::new(inner_val)), pos + consumed)
}
ValueType::DictionaryId | ValueType::List(_) | ValueType::Record(_) | ValueType::Tuple(_) => {
(
Value::None {
inner: ty,
},
pos,
)
}
}
}
fn status_to_error(status: Status) -> Error {
if let Ok(diag) = serde_json_from_str::<Diagnostic>(status.message()) {
return Error(Box::new(diag));
}
Error(Box::new(Diagnostic {
code: format!("GRPC_{:?}", status.code()),
message: status.message().to_string(),
..Default::default()
}))
}
pub struct GrpcSubscriptionAdapter {
inner: GrpcSubscription,
}
#[async_trait::async_trait]
impl ClientSubscription for GrpcSubscriptionAdapter {
fn subscription_id(&self) -> &str {
self.inner.subscription_id()
}
async fn recv(&mut self) -> Option<ChangePayload> {
let change = self.inner.recv().await?;
Some(ChangePayload {
subscription_id: self.inner.subscription_id().to_string(),
kind: change.kind,
content_type: "application/vnd.reifydb.grpc".to_string(),
body: JsonValue::Null,
frames: Some(change.frames),
})
}
}
pub struct BatchGrpcSubscriptionAdapter {
inner: BatchGrpcSubscription,
members_info: Vec<BatchMemberInfo>,
}
#[async_trait::async_trait]
impl ClientBatchSubscription for BatchGrpcSubscriptionAdapter {
fn batch_id(&self) -> &str {
self.inner.batch_id()
}
fn members(&self) -> &[BatchMemberInfo] {
&self.members_info
}
async fn recv(&mut self) -> Option<BatchPushEvent> {
let event = self.inner.recv().await?;
Some(match event {
BatchStreamEvent::Change(env) => {
let batch_id = env.batch_id.clone();
let entries = env
.entries
.into_iter()
.map(|(sub_id, change)| BatchChangeEntry {
subscription_id: sub_id,
kind: change.kind,
content_type: "application/vnd.reifydb.grpc".to_string(),
body: JsonValue::Null,
frames: Some(change.frames),
decode_error: None,
})
.collect();
BatchPushEvent::Change(BatchChangePayload {
batch_id,
entries,
})
}
BatchStreamEvent::MemberClosed {
batch_id,
subscription_id,
} => BatchPushEvent::MemberClosed(BatchMemberClosedPayload {
batch_id,
subscription_id,
}),
})
}
}
#[async_trait::async_trait]
impl ReifyClient for GrpcClient {
fn wire_format(&self) -> WireFormat {
self.format
}
fn is_authenticated(&self) -> bool {
self.token.is_some()
}
async fn authenticate(&mut self, token: &str) -> Result<(), Error> {
GrpcClient::authenticate(self, token);
Ok(())
}
async fn login_with_password(&mut self, identifier: &str, password: &str) -> Result<LoginResult, Error> {
GrpcClient::login_with_password(self, identifier, password).await
}
async fn login_with_token(&mut self, token: &str) -> Result<LoginResult, Error> {
GrpcClient::login_with_token(self, token).await
}
async fn logout(&mut self) -> Result<(), Error> {
GrpcClient::logout(self).await
}
async fn admin(&self, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
GrpcClient::admin(self, rql, params).await
}
async fn admin_with_meta(&self, rql: &str, params: Option<Params>) -> Result<AdminResult, Error> {
GrpcClient::admin_with_meta(self, rql, params).await
}
async fn command(&self, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
GrpcClient::command(self, rql, params).await
}
async fn command_with_meta(&self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
GrpcClient::command_with_meta(self, rql, params).await
}
async fn query(&self, rql: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
GrpcClient::query(self, rql, params).await
}
async fn query_with_meta(&self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
GrpcClient::query_with_meta(self, rql, params).await
}
async fn call(&self, name: &str, params: Option<Params>) -> Result<Vec<Frame>, Error> {
GrpcClient::call(self, name, params).await
}
async fn call_with_meta(&self, name: &str, params: Option<Params>) -> Result<CommandResult, Error> {
GrpcClient::call_with_meta(self, name, params).await
}
async fn subscribe(&self, rql: &str, config: SubscriptionConfig) -> Result<Box<dyn ClientSubscription>, Error> {
let inner = GrpcClient::subscribe(self, rql, config).await?;
Ok(Box::new(GrpcSubscriptionAdapter {
inner,
}))
}
async fn unsubscribe(&self, subscription_id: &str) -> Result<(), Error> {
GrpcClient::unsubscribe(self, subscription_id).await
}
async fn batch_subscribe<'a>(
&self,
items: &[BatchItem<'a>],
) -> Result<Box<dyn ClientBatchSubscription>, Error> {
let inner = GrpcClient::batch_subscribe(self, items).await?;
let members_info: Vec<BatchMemberInfo> = inner
.members()
.iter()
.map(|m| BatchMemberInfo {
index: m.index,
subscription_id: m.subscription_id.clone(),
})
.collect();
Ok(Box::new(BatchGrpcSubscriptionAdapter {
inner,
members_info,
}))
}
async fn batch_unsubscribe(&self, batch_id: &str) -> Result<(), Error> {
GrpcClient::batch_unsubscribe(self, batch_id).await
}
}