1use crate::inbound::{DataStreamRegistry, MediaFrameRegistry};
6use crate::lifecycle::compat_lock::{CompatLockManager, CompatibilityCheck};
7use crate::outbound::OutGate;
8use crate::wire::webrtc::SignalingClient;
9#[cfg(feature = "opentelemetry")]
10use crate::wire::webrtc::trace::inject_span_context_to_rpc;
11use actr_config::lock::LockFile;
12use actr_framework::{Bytes, Context, DataStream, Dest, MediaSample};
13use actr_protocol::{
14 AIdCredential, ActorResult, ActrError, ActrId, ActrType, PayloadType, ProtocolError,
15 RouteCandidatesRequest, RpcEnvelope, RpcRequest, route_candidates_request,
16};
17use async_trait::async_trait;
18use futures_util::future::BoxFuture;
19use std::path::PathBuf;
20use std::sync::Arc;
21
22#[derive(Clone)]
37pub struct RuntimeContext {
38 self_id: ActrId,
39 caller_id: Option<ActrId>,
40 request_id: String,
41 inproc_gate: OutGate, outproc_gate: Option<OutGate>, data_stream_registry: Arc<DataStreamRegistry>, media_frame_registry: Arc<MediaFrameRegistry>, signaling_client: Arc<dyn SignalingClient>,
46 credential: AIdCredential,
47 actr_lock: Option<LockFile>, config_dir: Option<PathBuf>, }
50
51impl RuntimeContext {
52 #[allow(clippy::too_many_arguments)] pub fn new(
69 self_id: ActrId,
70 caller_id: Option<ActrId>,
71 request_id: String,
72 inproc_gate: OutGate,
73 outproc_gate: Option<OutGate>,
74 data_stream_registry: Arc<DataStreamRegistry>,
75 media_frame_registry: Arc<MediaFrameRegistry>,
76 signaling_client: Arc<dyn SignalingClient>,
77 credential: AIdCredential,
78 actr_lock: Option<LockFile>,
79 config_dir: Option<PathBuf>,
80 ) -> Self {
81 Self {
82 self_id,
83 caller_id,
84 request_id,
85 inproc_gate,
86 outproc_gate,
87 data_stream_registry,
88 media_frame_registry,
89 signaling_client,
90 credential,
91 actr_lock,
92 config_dir,
93 }
94 }
95
96 #[inline]
102 fn select_gate(&self, dest: &Dest) -> ActorResult<&OutGate> {
103 match dest {
104 Dest::Shell | Dest::Local => Ok(&self.inproc_gate),
105 Dest::Actor(_) => self.outproc_gate.as_ref().ok_or_else(|| {
106 ProtocolError::Actr(ActrError::GateNotInitialized {
107 message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
108 .to_string(),
109 })
110 }),
111 }
112 }
113
114 #[inline]
120 fn extract_target_id<'a>(&'a self, dest: &'a Dest) -> &'a ActrId {
121 match dest {
122 Dest::Shell | Dest::Local => &self.self_id,
123 Dest::Actor(id) => id,
124 }
125 }
126
127 #[cfg_attr(
129 feature = "opentelemetry",
130 tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
131 )]
132 pub async fn call_raw(
133 &self,
134 target: &Dest,
135 route_key: String,
136 payload_type: PayloadType,
137 payload: Bytes,
138 timeout_ms: i64,
139 ) -> ActorResult<Bytes> {
140 #[cfg(feature = "opentelemetry")]
141 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
142
143 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
144 let mut envelope = RpcEnvelope {
145 route_key,
146 payload: Some(payload),
147 error: None,
148 traceparent: None,
149 tracestate: None,
150 request_id: uuid::Uuid::new_v4().to_string(),
151 metadata: vec![],
152 timeout_ms,
153 };
154 #[cfg(feature = "opentelemetry")]
155 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
156
157 let gate = self.select_gate(target)?;
158 let target_id = self.extract_target_id(target);
159 gate.send_request_with_type(target_id, payload_type, envelope)
160 .await
161 }
162
163 #[cfg_attr(
165 feature = "opentelemetry",
166 tracing::instrument(skip_all, name = "RuntimeContext.tell_raw")
167 )]
168 pub async fn tell_raw(
169 &self,
170 target: &Dest,
171 route_key: String,
172 payload_type: PayloadType,
173 payload: Bytes,
174 ) -> ActorResult<()> {
175 #[cfg(feature = "opentelemetry")]
176 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
177
178 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
179 let mut envelope = RpcEnvelope {
180 route_key,
181 payload: Some(payload),
182 error: None,
183 traceparent: None,
184 tracestate: None,
185 request_id: uuid::Uuid::new_v4().to_string(),
186 metadata: vec![],
187 timeout_ms: 0,
188 };
189 #[cfg(feature = "opentelemetry")]
190 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
191
192 let gate = self.select_gate(target)?;
193 let target_id = self.extract_target_id(target);
194 gate.send_message_with_type(target_id, payload_type, envelope)
195 .await
196 }
197
198 pub async fn send_data_stream_with_type(
203 &self,
204 target: &Dest,
205 payload_type: actr_protocol::PayloadType,
206 chunk: DataStream,
207 ) -> ActorResult<()> {
208 use actr_protocol::prost::Message as ProstMessage;
209
210 let payload = chunk.encode_to_vec();
211
212 let gate = self.select_gate(target)?;
213 let target_id = self.extract_target_id(target);
214
215 let result = gate
216 .send_data_stream(target_id, payload_type, bytes::Bytes::from(payload).into())
217 .await;
218
219 result
220 }
221
222 fn get_dependency_fingerprint(&self, target_type: &ActrType) -> Option<String> {
224 let actr_lock = self.actr_lock.as_ref()?;
225
226 let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
228 let actr_type_name = format!("{}+{}", target_type.manufacturer, target_type.name);
229
230 if let Some(dep) = actr_lock.get_dependency(&service_name) {
232 return Some(dep.fingerprint.clone());
233 }
234
235 if let Some(dep) = actr_lock.get_dependency(&actr_type_name) {
237 return Some(dep.fingerprint.clone());
238 }
239
240 if let Some(dep) = actr_lock.get_dependency(&target_type.name) {
242 return Some(dep.fingerprint.clone());
243 }
244
245 for dep in &actr_lock.dependencies {
247 if dep.actr_type == actr_type_name || dep.actr_type == target_type.name {
248 return Some(dep.fingerprint.clone());
249 }
250 }
251
252 None
253 }
254
255 async fn send_discovery_request(
257 &self,
258 target_type: &ActrType,
259 candidate_count: u32,
260 client_fingerprint: String,
261 ) -> ActorResult<InternalDiscoveryResult> {
262 let criteria = route_candidates_request::NodeSelectionCriteria {
263 candidate_count,
264 ranking_factors: Vec::new(),
265 minimal_dependency_requirement: None,
266 minimal_health_requirement: None,
267 };
268
269 let request = RouteCandidatesRequest {
270 target_type: target_type.clone(),
271 criteria: Some(criteria),
272 client_location: None,
273 client_fingerprint,
274 };
275
276 let response = self
277 .signaling_client
278 .send_route_candidates_request(self.self_id.clone(), self.credential.clone(), request)
279 .await
280 .map_err(|e| {
281 ProtocolError::TransportError(format!("Route candidates request failed: {e}"))
282 })?;
283
284 match response.result {
285 Some(actr_protocol::route_candidates_response::Result::Success(success)) => {
286 Ok(InternalDiscoveryResult {
287 candidates: success.candidates,
288 has_exact_match: success.has_exact_match.unwrap_or(false),
289 is_sub_healthy: success.is_sub_healthy.unwrap_or(false),
290 compatibility_info: success.compatibility_info,
291 })
292 }
293 Some(actr_protocol::route_candidates_response::Result::Error(err)) => {
294 Err(ProtocolError::TransportError(format!(
295 "Route candidates error {}: {}",
296 err.code, err.message
297 )))
298 }
299 None => Err(ProtocolError::TransportError(
300 "Invalid route candidates response: missing result".to_string(),
301 )),
302 }
303 }
304
305 async fn handle_negotiation_result(
307 &self,
308 target_type: &ActrType,
309 client_fingerprint: &str,
310 compatibility_info: &[actr_protocol::CandidateCompatibilityInfo],
311 has_exact_match: bool,
312 is_sub_healthy: bool,
313 ) {
314 let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
315
316 for info in compatibility_info {
318 let status = if info.is_exact_match.unwrap_or(false) {
319 "✅ 精确匹配"
320 } else if let Some(ref result) = info.analysis_result {
321 match result.level() {
322 actr_protocol::CompatibilityLevel::FullyCompatible => "✅ 完全兼容",
323 actr_protocol::CompatibilityLevel::BackwardCompatible => "⚠️ 向后兼容",
324 actr_protocol::CompatibilityLevel::BreakingChanges => "❌ 破坏性变更",
325 }
326 } else {
327 "❓ 未知"
328 };
329
330 tracing::debug!(
331 " - 候选 {}: {} (指纹: {})",
332 info.candidate_id.serial_number,
333 status,
334 &info.candidate_fingerprint[..20.min(info.candidate_fingerprint.len())]
335 );
336 }
337
338 if let Some(config_dir) = &self.config_dir {
340 if is_sub_healthy && !has_exact_match {
341 if let Some(resolved) = compatibility_info.first() {
343 tracing::warn!(
344 "🟡 SYSTEM SUB-HEALTHY: Service '{}' using compatible fingerprint ({}) \
345 instead of exact match ({}). Run 'actr install --force-update' to restore health.",
346 service_name,
347 &resolved.candidate_fingerprint
348 [..20.min(resolved.candidate_fingerprint.len())],
349 &client_fingerprint[..20.min(client_fingerprint.len())]
350 );
351
352 let mut manager = CompatLockManager::new(config_dir.clone());
354 if let Err(e) = manager
355 .record_negotiation(
356 &service_name,
357 client_fingerprint,
358 &resolved.candidate_fingerprint,
359 false, CompatibilityCheck::BackwardCompatible,
361 )
362 .await
363 {
364 tracing::warn!("Failed to update compat.lock.toml: {}", e);
365 }
366 }
367 } else if has_exact_match {
368 let mut manager = CompatLockManager::new(config_dir.clone());
370 if let Ok(Some(_)) = manager.load().await {
371 if let Some(resolved) = compatibility_info.first() {
372 if let Err(e) = manager
373 .record_negotiation(
374 &service_name,
375 client_fingerprint,
376 &resolved.candidate_fingerprint,
377 true, CompatibilityCheck::ExactMatch,
379 )
380 .await
381 {
382 tracing::debug!("Could not update compat.lock.toml: {}", e);
383 }
384 }
385 }
386 }
387 }
388 }
389}
390
391struct InternalDiscoveryResult {
393 candidates: Vec<ActrId>,
394 has_exact_match: bool,
395 is_sub_healthy: bool,
396 compatibility_info: Vec<actr_protocol::CandidateCompatibilityInfo>,
397}
398
399#[async_trait]
400impl Context for RuntimeContext {
401 fn self_id(&self) -> &ActrId {
404 &self.self_id
405 }
406
407 fn caller_id(&self) -> Option<&ActrId> {
408 self.caller_id.as_ref()
409 }
410
411 fn request_id(&self) -> &str {
412 &self.request_id
413 }
414
415 #[cfg_attr(
417 feature = "opentelemetry",
418 tracing::instrument(skip_all, name = "RuntimeContext.call")
419 )]
420 async fn call<R: RpcRequest>(&self, target: &Dest, request: R) -> ActorResult<R::Response> {
421 use actr_protocol::prost::Message as ProstMessage;
422
423 let payload: Bytes = request.encode_to_vec().into();
425
426 let route_key = R::route_key().to_string();
428
429 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
431 let mut envelope = RpcEnvelope {
432 route_key,
433 payload: Some(payload),
434 error: None,
435 traceparent: None,
436 tracestate: None,
437 request_id: uuid::Uuid::new_v4().to_string(), metadata: vec![],
439 timeout_ms: 30000, };
441 #[cfg(feature = "opentelemetry")]
443 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
444
445 let gate = self.select_gate(target)?;
447 let target_id = self.extract_target_id(target);
448
449 let response_bytes = gate
452 .send_request_with_type(target_id, R::payload_type(), envelope)
453 .await?;
454
455 R::Response::decode(&*response_bytes).map_err(|e| {
457 ProtocolError::Actr(ActrError::DecodeFailure {
458 message: format!(
459 "Failed to decode {}: {}",
460 std::any::type_name::<R::Response>(),
461 e
462 ),
463 })
464 })
465 }
466
467 #[cfg_attr(
468 feature = "opentelemetry",
469 tracing::instrument(skip_all, name = "RuntimeContext.tell")
470 )]
471 async fn tell<R: RpcRequest>(&self, target: &Dest, message: R) -> ActorResult<()> {
472 let payload: Bytes = message.encode_to_vec().into();
474
475 let route_key = R::route_key().to_string();
477
478 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
480 let mut envelope = RpcEnvelope {
481 route_key,
482 payload: Some(payload),
483 error: None,
484 traceparent: None,
485 tracestate: None,
486 request_id: uuid::Uuid::new_v4().to_string(),
487 metadata: vec![],
488 timeout_ms: 0, };
490 #[cfg(feature = "opentelemetry")]
492 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
493
494 let gate = self.select_gate(target)?;
496 let target_id = self.extract_target_id(target);
497
498 gate.send_message_with_type(target_id, R::payload_type(), envelope)
500 .await
501 }
502
503 async fn register_stream<F>(&self, stream_id: String, callback: F) -> ActorResult<()>
506 where
507 F: Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
508 {
509 tracing::debug!(
510 "📊 Registering DataStream callback for stream_id: {}",
511 stream_id
512 );
513 self.data_stream_registry
514 .register(stream_id, Arc::new(callback));
515 Ok(())
516 }
517
518 async fn unregister_stream(&self, stream_id: &str) -> ActorResult<()> {
519 tracing::debug!(
520 "🚫 Unregistering DataStream callback for stream_id: {}",
521 stream_id
522 );
523 self.data_stream_registry.unregister(stream_id);
524 Ok(())
525 }
526
527 async fn send_data_stream(&self, target: &Dest, chunk: DataStream) -> ActorResult<()> {
528 use actr_protocol::prost::Message as ProstMessage;
529
530 let payload = chunk.encode_to_vec();
532
533 tracing::debug!(
534 "📤 Sending DataStream: stream_id={}, sequence={}, size={} bytes",
535 chunk.stream_id,
536 chunk.sequence,
537 payload.len()
538 );
539
540 let gate = self.select_gate(target)?;
542 let target_id = self.extract_target_id(target);
543
544 gate.send_data_stream(
548 target_id,
549 actr_protocol::PayloadType::StreamReliable,
550 bytes::Bytes::from(payload),
551 )
552 .await
553 }
554
555 async fn discover_route_candidate(&self, target_type: &ActrType) -> ActorResult<ActrId> {
556 if !self.signaling_client.is_connected() {
557 return Err(ProtocolError::TransportError(
558 "Signaling client is not connected.".to_string(),
559 ));
560 }
561
562 let service_name = format!("{}/{}", target_type.manufacturer, target_type.name);
563
564 if let Some(config_dir) = &self.config_dir {
568 let mut compat_lock_manager = CompatLockManager::new(config_dir.clone());
569 if let Ok(Some(compat_lock)) = compat_lock_manager.load().await {
570 if let Some(cached_entry) = compat_lock.find_valid_entry(&service_name) {
571 tracing::info!(
572 "⚡ Fast path: Using cached negotiation for '{}' (resolved: {})",
573 service_name,
574 &cached_entry.resolved_fingerprint
575 [..20.min(cached_entry.resolved_fingerprint.len())]
576 );
577
578 let result = self
580 .send_discovery_request(
581 target_type,
582 1,
583 cached_entry.resolved_fingerprint.clone(),
584 )
585 .await?;
586
587 if let Some(candidate) = result.candidates.into_iter().next() {
588 tracing::info!(
589 "📊 服务发现结果 [{}]: 1 个候选 (快速路径, sub_healthy=true)",
590 service_name
591 );
592 return Ok(candidate);
593 }
594 tracing::warn!(
596 "⚠️ Fast path failed for '{}', falling back to normal discovery",
597 service_name
598 );
599 }
600 }
601 }
602
603 let client_fingerprint = self.get_dependency_fingerprint(target_type).ok_or_else(|| {
607 tracing::error!(
608 severity = 10,
609 error_category = "dependency_missing",
610 "❌ DEPENDENCY NOT FOUND: Service '{}' is not declared in Actr.lock.toml.\n\
611 Please run 'actr install' to generate the lock file with all dependencies.",
612 service_name
613 );
614 ProtocolError::Actr(ActrError::DependencyNotFound {
615 service_name: service_name.clone(),
616 message: format!(
617 "Dependency '{}' not found in Actr.lock.toml. Run 'actr install' to resolve dependencies.",
618 service_name
619 ),
620 })
621 })?;
622
623 tracing::debug!(
624 "📋 Found dependency fingerprint for '{}': {}",
625 service_name,
626 &client_fingerprint[..20.min(client_fingerprint.len())]
627 );
628
629 let result = self
633 .send_discovery_request(target_type, 1, client_fingerprint.clone())
634 .await?;
635
636 let has_exact_match = result.has_exact_match;
637 let is_sub_healthy = result.is_sub_healthy;
638
639 self.handle_negotiation_result(
643 target_type,
644 &client_fingerprint,
645 &result.compatibility_info,
646 has_exact_match,
647 is_sub_healthy,
648 )
649 .await;
650
651 tracing::info!(
653 "📊 服务发现结果 [{}]: {} 个候选, exact_match={}, sub_healthy={}",
654 service_name,
655 result.candidates.len(),
656 has_exact_match,
657 is_sub_healthy
658 );
659
660 result.candidates.into_iter().next().ok_or_else(|| {
661 ProtocolError::TargetNotFound(format!(
662 "No route candidates for type {}/{}",
663 target_type.manufacturer, target_type.name
664 ))
665 })
666 }
667
668 #[cfg_attr(
669 feature = "opentelemetry",
670 tracing::instrument(skip_all, name = "RuntimeContext.call_raw")
671 )]
672 async fn call_raw(
673 &self,
674 target: &ActrId,
675 route_key: &str,
676 payload: Bytes,
677 ) -> ActorResult<Bytes> {
678 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
680 let mut envelope = RpcEnvelope {
681 route_key: route_key.to_string(),
682 payload: Some(payload),
683 error: None,
684 traceparent: None,
685 tracestate: None,
686 request_id: uuid::Uuid::new_v4().to_string(),
687 metadata: vec![],
688 timeout_ms: 30000, };
690
691 #[cfg(feature = "opentelemetry")]
693 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
694
695 let gate = self.outproc_gate.as_ref().ok_or_else(|| {
697 ProtocolError::Actr(ActrError::GateNotInitialized {
698 message: "OutprocOutGate not initialized yet (WebRTC setup in progress)"
699 .to_string(),
700 })
701 })?;
702
703 gate.send_request(target, envelope).await
705 }
706
707 async fn register_media_track<F>(&self, track_id: String, callback: F) -> ActorResult<()>
710 where
711 F: Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync + 'static,
712 {
713 tracing::debug!(
714 "📹 Registering MediaTrack callback for track_id: {}",
715 track_id
716 );
717 self.media_frame_registry
718 .register(track_id, Arc::new(callback));
719 Ok(())
720 }
721
722 async fn unregister_media_track(&self, track_id: &str) -> ActorResult<()> {
723 tracing::debug!(
724 "📹 Unregistering MediaTrack callback for track_id: {}",
725 track_id
726 );
727 self.media_frame_registry.unregister(track_id);
728 Ok(())
729 }
730
731 async fn send_media_sample(
732 &self,
733 target: &Dest,
734 track_id: &str,
735 sample: MediaSample,
736 ) -> ActorResult<()> {
737 let gate = self.select_gate(target)?;
739
740 let target_id = self.extract_target_id(target);
742
743 gate.send_media_sample(target_id, track_id, sample).await
745 }
746}