use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fmt,
fs::{self, File},
io::{self, BufWriter, Write},
path::Path,
sync::Arc,
};
use arrow_array::{Array, StringArray};
use arrow_schema::{DataType, Field};
use oxgraph_hyper::DirectedHyperedgeParticipants;
use oxgraph_hyper_bcsr::{
BcsrHyperedgeId, BcsrSnapshotError, BcsrSnapshotHypergraph,
build::{
HyperBuildError, HyperVertexId, HypergraphBuilder, export_bcsr_snapshot_with_properties,
},
};
use oxgraph_property::{
DecodedPropertyData, DecodedPropertyLayer, HyperPropertyLayers, IdFamily, LayerId, LayerRole,
PropertyError, PropertyLayer, PropertyLayerDescriptor, StorageMode,
};
use oxgraph_snapshot::{Snapshot, SnapshotError};
use saphyr_parser::{Event, Parser, ScanError};
const SERVICES_YAML: &str = include_str!("services.yaml");
#[derive(Debug)]
enum DemoError {
Yaml(ScanError),
Build(HyperBuildError<u32, u32, u32>),
Property(PropertyError),
Snapshot(SnapshotError),
BcsrSnapshot(BcsrSnapshotError),
Io(io::Error),
UnexpectedLayerType {
name: String,
actual: DataType,
},
MissingNameLayer,
NameLayerLengthMismatch {
vertex_count: usize,
layer_len: usize,
},
}
impl fmt::Display for DemoError {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Yaml(error) => write!(formatter, "yaml: {error}"),
Self::Build(error) => write!(formatter, "build: {error}"),
Self::Property(error) => write!(formatter, "property: {error}"),
Self::Snapshot(error) => write!(formatter, "snapshot: {error}"),
Self::BcsrSnapshot(error) => write!(formatter, "bcsr snapshot: {error}"),
Self::Io(error) => write!(formatter, "io: {error}"),
Self::UnexpectedLayerType { name, actual } => {
write!(
formatter,
"expected Utf8 vertex_name layer `{name}`, found {actual}"
)
}
Self::MissingNameLayer => formatter.write_str("snapshot missing vertex_name layer"),
Self::NameLayerLengthMismatch {
vertex_count,
layer_len,
} => write!(
formatter,
"vertex_name layer length {layer_len} != {vertex_count} vertices"
),
}
}
}
impl std::error::Error for DemoError {}
impl From<ScanError> for DemoError {
fn from(error: ScanError) -> Self {
Self::Yaml(error)
}
}
impl From<HyperBuildError<u32, u32, u32>> for DemoError {
fn from(error: HyperBuildError<u32, u32, u32>) -> Self {
Self::Build(error)
}
}
impl From<PropertyError> for DemoError {
fn from(error: PropertyError) -> Self {
Self::Property(error)
}
}
impl From<SnapshotError> for DemoError {
fn from(error: SnapshotError) -> Self {
Self::Snapshot(error)
}
}
impl From<BcsrSnapshotError> for DemoError {
fn from(error: BcsrSnapshotError) -> Self {
Self::BcsrSnapshot(error)
}
}
impl From<io::Error> for DemoError {
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}
trait SnapshotSink {
fn write_snapshot(&mut self, bytes: &[u8]) -> io::Result<()>;
}
impl<W: Write> SnapshotSink for W {
fn write_snapshot(&mut self, bytes: &[u8]) -> io::Result<()> {
self.write_all(bytes)
}
}
struct ServicesModel {
services: BTreeMap<String, Vec<String>>,
}
impl ServicesModel {
const fn new() -> Self {
Self {
services: BTreeMap::new(),
}
}
fn service_count(&self) -> usize {
self.services.len()
}
fn dependency_count(&self) -> usize {
self.services.values().map(Vec::len).sum()
}
fn set_service<S>(&mut self, name: S, deps: Vec<String>)
where
S: Into<String>,
{
self.services.insert(name.into(), deps);
}
fn remove_service(&mut self, name: &str) {
self.services.remove(name);
for deps in self.services.values_mut() {
deps.retain(|dep| dep != name);
}
}
fn from_yaml(yaml: &str) -> Result<Self, DemoError> {
let mut ingest = YamlIngest::new();
let mut parser = Parser::new_from_str(yaml);
while let Some(event) = parser.next_event() {
let (event, _span) = event?;
ingest.handle(event);
}
Ok(ingest.into_model())
}
fn from_snapshot(bytes: &[u8]) -> Result<Self, DemoError> {
let snapshot = Snapshot::open(bytes)?;
let view = BcsrSnapshotHypergraph::<u32, u32, u32>::from_snapshot(&snapshot)?;
let layers = DecodedPropertyLayer::decode_all::<u32>(&snapshot)?;
let name_layer = layers
.iter()
.find(|layer| layer.id_family == IdFamily::Element && layer.name == "vertex_name")
.ok_or(DemoError::MissingNameLayer)?;
let DecodedPropertyData::Dense {
values: name_values,
} = &name_layer.data
else {
return Err(DemoError::UnexpectedLayerType {
name: name_layer.name.clone(),
actual: name_layer
.data
.data_type()
.cloned()
.unwrap_or(DataType::Utf8),
});
};
let names = name_values
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| DemoError::UnexpectedLayerType {
name: name_layer.name.clone(),
actual: name_values.data_type().clone(),
})?;
if names.len() != view.vertex_count() {
return Err(DemoError::NameLayerLengthMismatch {
vertex_count: view.vertex_count(),
layer_len: names.len(),
});
}
let mut services: BTreeMap<String, Vec<String>> = BTreeMap::new();
for vertex_index in 0..names.len() {
services.insert(names.value(vertex_index).to_owned(), Vec::new());
}
let hyperedge_count = u32::try_from(view.hyperedge_count()).unwrap_or(u32::MAX);
for hyperedge in 0..hyperedge_count {
let id = BcsrHyperedgeId::new(hyperedge);
let targets: Vec<String> = view
.target_participants(id)
.map(|v| names.value(v.get() as usize).to_owned())
.collect();
for source in view.source_participants(id) {
let source_name = names.value(source.get() as usize).to_owned();
merge_deps(services.entry(source_name).or_default(), &targets);
}
}
Ok(Self { services })
}
fn to_yaml(&self) -> String {
let mut out = String::from("services:\n");
for (name, deps) in &self.services {
out.push_str(" ");
out.push_str(name);
if deps.is_empty() {
out.push_str(": {}\n");
} else {
out.push_str(":\n depends_on: [");
out.push_str(&deps.join(", "));
out.push_str("]\n");
}
}
out
}
fn save_snapshot_to(&self, path: &Path) -> Result<(), DemoError> {
let bytes = self.to_snapshot()?;
let mut sink = BufWriter::new(File::create(path)?);
sink.write_snapshot(&bytes)?;
sink.flush()?;
Ok(())
}
fn save_yaml_to(&self, path: &Path) -> io::Result<()> {
let mut sink = BufWriter::new(File::create(path)?);
sink.write_all(self.to_yaml().as_bytes())?;
sink.flush()
}
fn to_snapshot(&self) -> Result<Vec<u8>, DemoError> {
let mut all_names: BTreeSet<&str> = BTreeSet::new();
for (name, deps) in &self.services {
all_names.insert(name.as_str());
for dep in deps {
all_names.insert(dep.as_str());
}
}
let mut builder = HypergraphBuilder::<u32, u32, u32>::new();
let mut id_by_name: BTreeMap<&str, HyperVertexId<u32>> = BTreeMap::new();
for name in all_names {
let id = builder.add_vertex()?;
id_by_name.insert(name, id);
}
for (source_name, deps) in &self.services {
if deps.is_empty() {
continue;
}
let Some(&source_id) = id_by_name.get(source_name.as_str()) else {
continue;
};
let target_ids: Vec<HyperVertexId<u32>> = deps
.iter()
.filter_map(|dep| id_by_name.get(dep.as_str()).copied())
.collect();
if !target_ids.is_empty() {
builder.add_hyperedge(&[source_id], &target_ids)?;
}
}
let frozen = builder.freeze()?;
let mut ordered: Vec<&str> = vec![""; id_by_name.len()];
for (name, id) in &id_by_name {
let slot = id.get() as usize;
if slot >= ordered.len() {
return Err(DemoError::NameLayerLengthMismatch {
vertex_count: id_by_name.len(),
layer_len: slot + 1,
});
}
ordered[slot] = *name;
}
let array = Arc::new(StringArray::from(ordered));
let descriptor = PropertyLayerDescriptor::<u32, u32>::try_new(
LayerId(1_u32),
"vertex_name",
IdFamily::Element,
LayerRole::Property,
StorageMode::Dense,
Field::new("vertex_name", DataType::Utf8, false),
)?;
let name_layer = PropertyLayer::try_new_dense(descriptor, array)?;
let element_layers = [name_layer];
Ok(export_bcsr_snapshot_with_properties(
&frozen,
HyperPropertyLayers {
element: &element_layers,
relation: &[],
incidence: &[],
},
)?)
}
}
struct YamlIngest {
model: ServicesModel,
frames: Vec<Frame>,
}
enum Frame {
TopMapping {
pending_key: Option<String>,
},
ServicesMapping {
pending_service: Option<String>,
},
ServiceBody {
service_name: String,
pending_key: Option<String>,
},
DependsOnSeq {
service_name: String,
deps: Vec<String>,
},
Skip,
}
fn merge_deps(entry: &mut Vec<String>, additions: &[String]) {
for addition in additions {
if !entry.iter().any(|existing| existing == addition) {
entry.push(addition.clone());
}
}
}
fn assign_pending(pending: &mut Option<String>, value: Cow<'_, str>) {
if pending.is_none() {
*pending = Some(value.into_owned());
} else {
*pending = None;
}
}
impl YamlIngest {
const fn new() -> Self {
Self {
model: ServicesModel::new(),
frames: Vec::new(),
}
}
fn into_model(self) -> ServicesModel {
self.model
}
fn handle(&mut self, event: Event<'_>) {
match event {
Event::Scalar(value, ..) => self.handle_scalar(value),
Event::MappingStart(_, _) => self.handle_mapping_start(),
Event::SequenceStart(_, _) => self.handle_sequence_start(),
Event::MappingEnd | Event::SequenceEnd => self.handle_close(),
Event::Nothing
| Event::StreamStart
| Event::StreamEnd
| Event::DocumentStart(_)
| Event::DocumentEnd
| Event::Alias(_) => {}
}
}
fn handle_scalar(&mut self, value: Cow<'_, str>) {
match self.frames.last_mut() {
None | Some(Frame::Skip) => {}
Some(
Frame::TopMapping {
pending_key: pending,
}
| Frame::ServicesMapping {
pending_service: pending,
}
| Frame::ServiceBody {
pending_key: pending,
..
},
) => assign_pending(pending, value),
Some(Frame::DependsOnSeq { deps, .. }) => {
deps.push(value.into_owned());
}
}
}
fn handle_mapping_start(&mut self) {
let new_frame = match self.frames.last_mut() {
None => Frame::TopMapping { pending_key: None },
Some(Frame::TopMapping { pending_key }) => {
let key = pending_key.take();
if key.as_deref() == Some("services") {
Frame::ServicesMapping {
pending_service: None,
}
} else {
Frame::Skip
}
}
Some(Frame::ServicesMapping { pending_service }) => {
if let Some(name) = pending_service.take() {
self.model.services.entry(name.clone()).or_default();
Frame::ServiceBody {
service_name: name,
pending_key: None,
}
} else {
Frame::Skip
}
}
Some(Frame::ServiceBody { pending_key, .. }) => {
*pending_key = None;
Frame::Skip
}
Some(Frame::DependsOnSeq { .. } | Frame::Skip) => Frame::Skip,
};
self.frames.push(new_frame);
}
fn handle_sequence_start(&mut self) {
let new_frame = match self.frames.last_mut() {
Some(Frame::ServiceBody {
pending_key,
service_name,
}) => {
let key = pending_key.take();
if key.as_deref() == Some("depends_on") {
Frame::DependsOnSeq {
service_name: service_name.clone(),
deps: Vec::new(),
}
} else {
Frame::Skip
}
}
_ => Frame::Skip,
};
self.frames.push(new_frame);
}
fn handle_close(&mut self) {
if let Some(Frame::DependsOnSeq { service_name, deps }) = self.frames.pop()
&& !deps.is_empty()
{
self.model
.services
.entry(service_name)
.or_default()
.extend(deps);
}
}
}
trait DataTypeHint {
fn data_type(&self) -> Option<&DataType>;
}
impl DataTypeHint for DecodedPropertyData {
fn data_type(&self) -> Option<&DataType> {
match self {
Self::Dense { values } | Self::Sparse { values, .. } => Some(values.data_type()),
_ => None,
}
}
}
fn main() -> Result<(), DemoError> {
let snapshot_path = std::env::temp_dir().join("services.oxsnap");
let yaml_path = std::env::temp_dir().join("services.written.yaml");
let seed_model = ServicesModel::from_yaml(SERVICES_YAML)?;
seed_model.save_snapshot_to(&snapshot_path)?;
let seed_bytes = seed_model.to_snapshot()?;
let mut memory_sink: Vec<u8> = Vec::with_capacity(seed_bytes.len());
memory_sink.write_snapshot(&seed_bytes)?;
println!("--- seed ---");
println!("services = {}", seed_model.service_count());
println!("deps = {}", seed_model.dependency_count());
println!("snapshot = {} bytes", seed_bytes.len());
println!("disk sink = {}", snapshot_path.display());
println!("memory sink = {} bytes", memory_sink.len());
let read_bytes = fs::read(&snapshot_path)?;
let mut model = ServicesModel::from_snapshot(&read_bytes)?;
println!("--- read ---");
println!("source = {}", snapshot_path.display());
println!("services = {}", model.service_count());
println!("deps = {}", model.dependency_count());
print!("{}", model.to_yaml());
model.remove_service("worker");
model.set_service("metrics", vec!["db".to_owned(), "cache".to_owned()]);
println!("--- mutate ---");
println!(
"services = {} (after remove worker / add metrics)",
model.service_count()
);
println!("deps = {}", model.dependency_count());
model.save_snapshot_to(&snapshot_path)?;
let canonical = ServicesModel::from_snapshot(&fs::read(&snapshot_path)?)?;
canonical.save_yaml_to(&yaml_path)?;
let saved_yaml = canonical.to_yaml();
println!("--- save ---");
println!("snapshot = {}", snapshot_path.display());
println!(
"yaml = {} bytes -> {}",
saved_yaml.len(),
yaml_path.display()
);
print!("{saved_yaml}");
Ok(())
}