plexus-core 0.5.3

Core infrastructure for Plexus RPC: Activation trait, DynamicHub, schemas
Documentation
use super::methods::HealthMethod;
use super::types::HealthEvent;
use crate::plexus::{wrap_stream, PlexusError, PlexusStream, Activation, ChildRouter, PlexusStreamItem, StreamMetadata, PlexusContext};
use async_stream::stream;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::{server::Methods, SubscriptionResult};
use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink};
use serde_json::Value;
use std::pin::Pin;
use std::time::Instant;

/// Health RPC interface
#[rpc(server, namespace = "health")]
pub trait HealthRpc {
    /// Check health status (streaming subscription)
    #[subscription(name = "check", unsubscribe = "unsubscribe_check", item = serde_json::Value)]
    async fn check(&self) -> SubscriptionResult;
}

/// Health activation - minimal reference implementation
///
/// This activation demonstrates the caller-wraps architecture.
/// The `check_stream` method returns typed domain events (HealthEvent),
/// and the `call` method wraps them using `wrap_stream`.
#[derive(Clone)]
pub struct Health {
    start_time: Instant,
}

impl Health {
    /// Namespace for the health plugin
    pub const NAMESPACE: &'static str = "health";
    /// Version of the health plugin
    pub const VERSION: &'static str = "1.0.0";
    /// Stable plugin instance ID for handle routing (same formula as hub_methods macro)
    /// Generated from "health@1" (namespace + major version)
    pub const PLUGIN_ID: uuid::Uuid = uuid::uuid!("dc560257-b7c5-575b-b893-b448c87ca797");

    pub fn new() -> Self {
        Self {
            start_time: Instant::now(),
        }
    }

    /// Returns typed stream - caller will wrap with metadata
    fn check_stream(
        &self,
    ) -> Pin<Box<dyn Stream<Item = HealthEvent> + Send + 'static>> {
        let uptime = self.start_time.elapsed().as_secs();

        Box::pin(stream! {
            yield HealthEvent::Status {
                status: "healthy".to_string(),
                uptime_seconds: uptime,
                timestamp: chrono::Utc::now().timestamp(),
            };
        })
    }
}

impl Default for Health {
    fn default() -> Self {
        Self::new()
    }
}

/// RPC server implementation
#[async_trait]
impl HealthRpcServer for Health {
    async fn check(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
        let sink = pending.accept().await?;

        // Get wrapped stream
        let stream = self.check_stream();
        let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);

        // Forward all items to sink
        tokio::spawn(async move {
            let mut stream = wrapped;
            while let Some(item) = stream.next().await {
                if let Ok(raw_value) = serde_json::value::to_raw_value(&item) {
                    if sink.send(raw_value).await.is_err() {
                        break;
                    }
                }
            }
            // Send done event
            let done = PlexusStreamItem::Done {
                metadata: StreamMetadata::new(vec!["health".into()], PlexusContext::hash()),
            };
            if let Ok(raw_value) = serde_json::value::to_raw_value(&done) {
                let _ = sink.send(raw_value).await;
            }
        });

        Ok(())
    }
}

/// Activation trait implementation - unified interface for Plexus
#[async_trait]
impl Activation for Health {
    type Methods = HealthMethod;

    fn namespace(&self) -> &str {
        "health"
    }

    fn version(&self) -> &str {
        "1.0.0"
    }

    fn plugin_id(&self) -> uuid::Uuid {
        Self::PLUGIN_ID
    }

    fn description(&self) -> &str {
        "Check hub health and uptime"
    }

    fn methods(&self) -> Vec<&str> {
        vec!["check", "schema"]
    }

    fn method_help(&self, method: &str) -> Option<String> {
        match method {
            "schema" => Some("Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.".to_string()),
            _ => HealthMethod::description(method).map(|s| s.to_string()),
        }
    }

    async fn call(&self, method: &str, params: Value, _auth: Option<&crate::plexus::AuthContext>, _raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
        match method {
            "check" => {
                let stream = self.check_stream();
                Ok(wrap_stream(stream, "health.status", vec!["health".into()]))
            }
            "schema" => {
                use crate::plexus::SchemaResult;

                // Check if a specific method was requested
                let method_name: Option<String> = params.get("method")
                    .and_then(|v| v.as_str())
                    .map(|s| s.to_string());

                let plugin_schema = self.plugin_schema();

                let result = if let Some(ref name) = method_name {
                    // Find the specific method
                    plugin_schema.methods.iter()
                        .find(|m| m.name == *name)
                        .map(|m| SchemaResult::Method(m.clone()))
                        .ok_or_else(|| PlexusError::MethodNotFound {
                            activation: "health".to_string(),
                            method: name.clone(),
                        })?
                } else {
                    // Return full plugin schema
                    SchemaResult::Plugin(plugin_schema)
                };

                Ok(wrap_stream(
                    futures::stream::once(async move { result }),
                    "health.schema",
                    vec!["health".into()]
                ))
            }
            _ => {
                // Check for {method}.schema pattern (e.g., "check.schema")
                // Only if the prefix is an actual local method
                if let Some(method_name) = method.strip_suffix(".schema") {
                    use crate::plexus::SchemaResult;

                    let plugin_schema = self.plugin_schema();
                    if let Some(m) = plugin_schema.methods.iter().find(|m| m.name == method_name) {
                        let result = SchemaResult::Method(m.clone());
                        return Ok(wrap_stream(
                            futures::stream::once(async move { result }),
                            "health.method_schema",
                            vec!["health".into()]
                        ));
                    }
                }

                Err(PlexusError::MethodNotFound {
                    activation: "health".to_string(),
                    method: method.to_string(),
                })
            }
        }
    }

    fn into_rpc_methods(self) -> Methods {
        // Register RPC subscription methods
        self.into_rpc().into()
    }
}

/// ChildRouter implementation for Health — leaf activation, no children.
#[async_trait]
impl ChildRouter for Health {
    fn router_namespace(&self) -> &str {
        "health"
    }

    async fn router_call(&self, method: &str, params: Value, auth: Option<&crate::plexus::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
        self.call(method, params, auth, raw_ctx).await
    }

    async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
        None
    }
}