runar_node 0.1.0

Runar Node implementation
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
// Registry Service Implementation
//
// INTENTION: Provide a consistent API for accessing service metadata through the
// standard request interface, eliminating the need for direct methods and aligning
// with the architectural principle of using the service request pattern for all operations.
//
// This service provides access to service metadata like states, actions, events, etc.
// through standard request paths like:
// - $registry/services/list
// - $registry/services/{service_path}
// - $registry/services/{service_path}/state

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use std::sync::Arc;

use crate::routing::TopicPath;
use crate::services::{LifecycleContext, RegistryDelegate, RequestContext};
use crate::{AbstractService, ServiceState};
use runar_common::logging::Logger;
use runar_macros_common::{log_debug, log_error, log_info};
use runar_serializer::ArcValue;

/// Registry Info Service - provides information about registered services without holding state
pub struct RegistryService {
    /// Logger instance
    logger: Arc<Logger>,

    /// Registry delegate for accessing node registry information
    registry_delegate: Arc<dyn RegistryDelegate>,
}

impl RegistryService {
    /// Create a new Registry Service
    pub fn new(logger: Arc<Logger>, delegate: Arc<dyn RegistryDelegate>) -> Self {
        RegistryService {
            logger,
            registry_delegate: delegate,
        }
    }

    /// Extract the service path parameter from the request context
    ///
    /// INTENTION: Extract the target service path (the service we want info about)
    /// from the path parameters extracted from the URL template.
    ///
    /// This is NOT about getting the context's handler service path.
    /// It's about extracting the service name from the URL pattern like:
    /// "$registry/services/math" where "math" is the service we want information about.
    /// it uses the context path_params which contains the extracted parameters from the template
    fn extract_service_path(&self, ctx: &RequestContext) -> Result<String> {
        // Look for the path parameter that was extracted during template matching
        if let Some(path) = ctx.path_params.get("service_path") {
            log_debug!(
                ctx.logger,
                "Using service_path '{path}' from path parameters"
            );
            return Ok(path.clone());
        }

        // If we get here, we couldn't find the target service path
        log_error!(ctx.logger, "Missing required 'service_path' parameter");
        Err(anyhow!("Missing required 'service_path' parameter"))
    }

    /// Register the list services action
    async fn register_list_services_action(&self, context: &LifecycleContext) -> Result<()> {
        let self_clone = self.clone();
        // Add debug to see what is registered
        log_debug!(
            context.logger,
            "Registering list_services handler with path: services/list"
        );
        context
            .register_action(
                "services/list",
                Arc::new(move |params, ctx| {
                    let inner_self = self_clone.clone();
                    Box::pin(async move {
                        inner_self
                            .handle_list_services(params.unwrap_or_else(ArcValue::null), ctx)
                            .await
                    })
                }),
            )
            .await?;
        log_debug!(context.logger, "Registered services/list action");
        Ok(())
    }

    /// Register the service info action
    async fn register_service_info_action(&self, context: &LifecycleContext) -> Result<()> {
        let self_clone = self.clone();

        // Add debug to see what is registered
        log_debug!(
            context.logger,
            "Registering service_info handler with path: services/{{service_path}}"
        );

        context
            .register_action(
                "services/{service_path}",
                Arc::new(move |params, ctx| {
                    let inner_self = self_clone.clone();
                    Box::pin(async move {
                        inner_self
                            .handle_service_info(params.unwrap_or_else(ArcValue::null), ctx)
                            .await
                    })
                }),
            )
            .await?;

        log_debug!(
            context.logger,
            "Registered services/{{service_path}} action"
        );
        Ok(())
    }

    /// Register the service state action
    async fn register_service_state_action(&self, context: &LifecycleContext) -> Result<()> {
        let self_clone = self.clone();

        // Add debug to see what is registered
        log_debug!(
            context.logger,
            "Registering service_state handler with path: services/{{service_path}}/state"
        );

        context
            .register_action(
                "services/{service_path}/state",
                Arc::new(move |params, ctx| {
                    let inner_self = self_clone.clone();
                    Box::pin(async move {
                        let is_local = params
                            .map(|p| p.as_type::<bool>().unwrap_or(true))
                            .unwrap_or(true);
                        inner_self.handle_service_state(is_local, ctx).await
                    })
                }),
            )
            .await?;

        log_debug!(
            context.logger,
            "Registered services/{{service_path}}/state action"
        );
        Ok(())
    }

    /// Register the pause service action
    async fn register_pause_action(&self, context: &LifecycleContext) -> Result<()> {
        let self_clone = self.clone();

        // Add debug to see what is registered
        log_debug!(
            context.logger,
            "Registering pause handler with path: services/{{service_path}}/pause"
        );

        context
            .register_action(
                "services/{service_path}/pause",
                Arc::new(move |_params, ctx| {
                    let inner_self = self_clone.clone();
                    Box::pin(async move { inner_self.handle_pause_service(ctx).await })
                }),
            )
            .await?;

        log_debug!(
            context.logger,
            "Registered services/{{service_path}}/pause action"
        );
        Ok(())
    }

    /// Register the resume service action
    async fn register_resume_action(&self, context: &LifecycleContext) -> Result<()> {
        let self_clone = self.clone();

        // Add debug to see what is registered
        log_debug!(
            context.logger,
            "Registering resume handler with path: services/{{service_path}}/resume"
        );

        context
            .register_action(
                "services/{service_path}/resume",
                Arc::new(move |_params, ctx| {
                    let inner_self = self_clone.clone();
                    Box::pin(async move { inner_self.handle_resume_service(ctx).await })
                }),
            )
            .await?;

        log_debug!(
            context.logger,
            "Registered services/{{service_path}}/resume action"
        );
        Ok(())
    }

    /// Handler for listing all services
    async fn handle_list_services(
        &self,
        _params: ArcValue,
        ctx: RequestContext,
    ) -> Result<ArcValue> {
        log_debug!(ctx.logger, "Listing all services");

        // Get all service metadata directly
        let service_metadata = self
            .registry_delegate
            .get_all_service_metadata(true)
            .await?;

        // Convert the HashMap of ServiceMetadata to a Vec
        let metadata_vec: Vec<ArcValue> = service_metadata
            .values()
            .map(|metadata| ArcValue::new_struct(metadata.clone()))
            .collect();

        // Return the list of service metadata
        Ok(ArcValue::new_list(metadata_vec))
    }

    /// Handler for getting detailed information about a specific service
    async fn handle_service_info(
        &self,
        _params: ArcValue,
        ctx: RequestContext,
    ) -> Result<ArcValue> {
        // Extract the service path from path parameters
        let actual_service_path = match self.extract_service_path(&ctx) {
            Ok(path) => path,
            Err(error) => return Err(error),
        };

        // Get the service metadata for the specific service path
        let network_id_string = ctx.network_id().clone();
        let service_topic = match TopicPath::new(&actual_service_path, &network_id_string) {
            Ok(topic) => topic,
            Err(e) => {
                return Err(anyhow!("Invalid service path format: {}", e));
            }
        };

        // Return the service metadata if found, or None if not found
        if let Some(service_metadata) = self
            .registry_delegate
            .get_service_metadata(&service_topic)
            .await
        {
            Ok(ArcValue::new_struct(service_metadata.clone()))
        } else {
            log_debug!(ctx.logger, "Service '{actual_service_path}' not found");
            Ok(ArcValue::null())
        }
    }

    /// Handler for getting just the state of a service
    async fn handle_service_state(&self, is_local: bool, ctx: RequestContext) -> Result<ArcValue> {
        // Extract the service path from path parameters
        let service_path = match self.extract_service_path(&ctx) {
            Ok(path) => path,
            Err(error) => {
                return Err(anyhow!(
                    "Missing required 'service_path' parameter: {}",
                    error
                ));
            }
        };
        let network_id_string = ctx.network_id().clone();
        let service_topic = TopicPath::new_service(&network_id_string, &service_path);

        log_debug!(
            ctx.logger,
            "Getting service state for '{service_path}' (is_local: {is_local})"
        );

        // Get service state based on is_local parameter
        let service_state = if is_local {
            self.registry_delegate
                .get_local_service_state(&service_topic)
                .await
        } else {
            self.registry_delegate
                .get_remote_service_state(&service_topic)
                .await
        };

        match service_state {
            Some(state) => Ok(ArcValue::new_struct(state)),
            None => {
                log_debug!(
                    ctx.logger,
                    "Service '{service_path}' not found (param is_local: {is_local})"
                );
                Ok(ArcValue::new_struct(ServiceState::Unknown))
            }
        }
    }

    /// Handler for pausing a service
    async fn handle_pause_service(&self, ctx: RequestContext) -> Result<ArcValue> {
        // Extract the service path from path parameters
        let service_path = match self.extract_service_path(&ctx) {
            Ok(path) => path,
            Err(error) => {
                return Err(anyhow!(
                    "Missing required 'service_path' parameter: {}",
                    error
                ));
            }
        };
        let network_id_string = ctx.network_id().clone();
        let service_topic = TopicPath::new_service(&network_id_string, &service_path);

        // Validate that the service can be paused
        self.registry_delegate
            .validate_pause_transition(&service_topic)
            .await?;

        // Get current state and update to Paused
        if let Some(current_state) = self
            .registry_delegate
            .get_local_service_state(&service_topic)
            .await
        {
            self.registry_delegate
                .update_local_service_state_if_valid(
                    &service_topic,
                    ServiceState::Paused,
                    current_state,
                )
                .await?;

            log_info!(ctx.logger, "Service '{service_path}' paused successfully");
            Ok(ArcValue::new_struct(ServiceState::Paused))
        } else {
            log_debug!(ctx.logger, "Service '{service_path}' not found");
            Ok(ArcValue::null())
        }
    }

    /// Handler for resuming a service
    async fn handle_resume_service(&self, ctx: RequestContext) -> Result<ArcValue> {
        // Extract the service path from path parameters
        let service_path = match self.extract_service_path(&ctx) {
            Ok(path) => path,
            Err(error) => {
                return Err(anyhow!(
                    "Missing required 'service_path' parameter: {}",
                    error
                ));
            }
        };
        let network_id_string = ctx.network_id().clone();
        let service_topic = TopicPath::new_service(&network_id_string, &service_path);

        // Validate that the service can be resumed
        self.registry_delegate
            .validate_resume_transition(&service_topic)
            .await?;

        // Get current state and update to Running
        if let Some(current_state) = self
            .registry_delegate
            .get_local_service_state(&service_topic)
            .await
        {
            self.registry_delegate
                .update_local_service_state_if_valid(
                    &service_topic,
                    ServiceState::Running,
                    current_state,
                )
                .await?;

            log_info!(ctx.logger, "Service '{service_path}' resumed successfully");
            Ok(ArcValue::new_struct(ServiceState::Running))
        } else {
            log_debug!(ctx.logger, "Service '{service_path}' not found");
            Ok(ArcValue::null())
        }
    }
}

#[async_trait]
impl AbstractService for RegistryService {
    fn name(&self) -> &str {
        "registry"
    }

    fn path(&self) -> &str {
        "$registry"
    }

    fn version(&self) -> &str {
        "1.0.0"
    }

    fn description(&self) -> &str {
        "Registry service for service discovery and metadata"
    }

    // internal services is not bound to any specificy network
    fn network_id(&self) -> Option<String> {
        None
    }
    fn set_network_id(&mut self, _network_id: String) {}

    /// Initialize the Registry Service by registering all handlers
    ///
    /// INTENTION: Set up all the action handlers for the registry service,
    /// following the path template pattern for consistent parameter extraction.
    /// Each path template defines a specific API endpoint with parameters.
    async fn init(&self, context: LifecycleContext) -> Result<()> {
        log_info!(context.logger, "Initializing Registry Service");

        // Register all actions with their template patterns
        log_debug!(
            context.logger,
            "Registering Registry Service action handlers"
        );

        // Services list does not require parameters
        self.register_list_services_action(&context).await?;
        log_debug!(
            context.logger,
            "Registered handler for listing all services"
        );

        // Service info uses the {service_path} parameter
        self.register_service_info_action(&context).await?;
        log_debug!(
            context.logger,
            "Registered handler for service info with path parameter"
        );

        // Service state also uses the {service_path} parameter
        self.register_service_state_action(&context).await?;
        log_debug!(
            context.logger,
            "Registered handler for service state with path parameter"
        );

        // Pause service uses the {service_path} parameter
        self.register_pause_action(&context).await?;
        log_debug!(
            context.logger,
            "Registered handler for pause service with path parameter"
        );

        // Resume service uses the {service_path} parameter
        self.register_resume_action(&context).await?;
        log_debug!(
            context.logger,
            "Registered handler for resume service with path parameter"
        );

        log_info!(context.logger, "Registry Service initialization complete");

        Ok(())
    }

    async fn start(&self, context: LifecycleContext) -> Result<()> {
        log_info!(context.logger, "Starting Registry Service");
        Ok(())
    }

    async fn stop(&self, context: LifecycleContext) -> Result<()> {
        log_info!(context.logger, "Stopping Registry Service");
        Ok(())
    }
}

// Implement Clone manually since we can't derive it due to async_trait
impl Clone for RegistryService {
    fn clone(&self) -> Self {
        Self {
            logger: self.logger.clone(),
            registry_delegate: self.registry_delegate.clone(),
        }
    }
}