import requests
import json
import pyarrow as pa
import pyarrow.ipc as ipc
from typing import List, Dict, Optional, BinaryIO
import io
class IPFRSClient:
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
def add_file(self, file_path: str) -> Dict[str, any]:
url = f"{self.base_url}/api/v0/add"
with open(file_path, 'rb') as f:
files = {'file': f}
response = self.session.post(url, files=files)
response.raise_for_status()
return response.json()
def cat(self, cid: str) -> bytes:
url = f"{self.base_url}/api/v0/cat"
response = self.session.post(url, params={'arg': cid})
response.raise_for_status()
return response.content
def get_block(self, cid: str) -> bytes:
url = f"{self.base_url}/api/v0/block/get"
response = self.session.post(url, params={'arg': cid})
response.raise_for_status()
return response.content
def put_block(self, data: bytes) -> Dict[str, any]:
url = f"{self.base_url}/api/v0/block/put"
files = {'block': io.BytesIO(data)}
response = self.session.post(url, files=files)
response.raise_for_status()
return response.json()
def get(self, cid: str, byte_range: Optional[tuple] = None) -> bytes:
url = f"{self.base_url}/ipfs/{cid}"
headers = {}
if byte_range:
start, end = byte_range
headers['Range'] = f'bytes={start}-{end}'
response = self.session.get(url, headers=headers)
response.raise_for_status()
return response.content
def batch_get_blocks(self, cids: List[str]) -> List[Dict[str, any]]:
url = f"{self.base_url}/v1/block/batch/get"
payload = {'cids': cids}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()['blocks']
def batch_has_blocks(self, cids: List[str]) -> List[Dict[str, any]]:
url = f"{self.base_url}/v1/block/batch/has"
payload = {'cids': cids}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()['results']
def streaming_upload(self, file_path: str) -> Dict[str, any]:
url = f"{self.base_url}/v1/stream/upload"
with open(file_path, 'rb') as f:
files = {'file': f}
response = self.session.post(url, files=files)
response.raise_for_status()
return response.json()
def streaming_download(self, cid: str, chunk_size: int = 65536) -> bytes:
url = f"{self.base_url}/v1/stream/download/{cid}"
params = {'chunk_size': chunk_size}
response = self.session.get(url, params=params, stream=True)
response.raise_for_status()
chunks = []
for chunk in response.iter_content(chunk_size=chunk_size):
chunks.append(chunk)
return b''.join(chunks)
def get_tensor(self, cid: str, slice_spec: Optional[str] = None) -> bytes:
url = f"{self.base_url}/v1/tensor/{cid}"
params = {}
if slice_spec:
params['slice'] = slice_spec
response = self.session.get(url, params=params)
response.raise_for_status()
return response.content
def get_tensor_info(self, cid: str) -> Dict[str, any]:
url = f"{self.base_url}/v1/tensor/{cid}/info"
response = self.session.get(url)
response.raise_for_status()
return response.json()
def get_tensor_arrow(self, cid: str, slice_spec: Optional[str] = None) -> pa.Table:
url = f"{self.base_url}/v1/tensor/{cid}/arrow"
params = {}
if slice_spec:
params['slice'] = slice_spec
response = self.session.get(url, params=params)
response.raise_for_status()
reader = ipc.open_stream(response.content)
table = reader.read_all()
shape_str = response.headers.get('X-Tensor-Shape', '[]')
dtype_str = response.headers.get('X-Tensor-Dtype', 'unknown')
metadata = {
'tensor_shape': shape_str,
'tensor_dtype': dtype_str,
'tensor_elements': response.headers.get('X-Tensor-Elements', '0')
}
return table, metadata
def get_id(self) -> Dict[str, any]:
url = f"{self.base_url}/api/v0/id"
response = self.session.post(url)
response.raise_for_status()
return response.json()
def get_version(self) -> Dict[str, any]:
url = f"{self.base_url}/api/v0/version"
response = self.session.post(url)
response.raise_for_status()
return response.json()
def get_peers(self) -> List[Dict[str, any]]:
url = f"{self.base_url}/api/v0/swarm/peers"
response = self.session.post(url)
response.raise_for_status()
return response.json().get('Peers', [])
def get_bandwidth_stats(self) -> Dict[str, any]:
url = f"{self.base_url}/api/v0/stats/bw"
response = self.session.post(url)
response.raise_for_status()
return response.json()
def main():
client = IPFRSClient("http://localhost:8080")
print("=== IPFRS Python Client Example ===\n")
print("1. File Upload and Download")
try:
test_file = "/tmp/test_ipfrs.txt"
with open(test_file, 'w') as f:
f.write("Hello, IPFRS! This is a test file.")
result = client.add_file(test_file)
cid = result['Hash']
print(f" Uploaded file: {cid} ({result['Size']} bytes)")
content = client.cat(cid)
print(f" Downloaded: {content.decode()}")
except Exception as e:
print(f" Error: {e}")
print("\n2. Batch Block Operations")
try:
cids_to_check = [cid] results = client.batch_has_blocks(cids_to_check)
for result in results:
print(f" {result['cid']}: exists={result['exists']}")
except Exception as e:
print(f" Error: {e}")
print("\n3. Tensor Operations with Apache Arrow")
try:
print(" (Skipping - no tensor CID available)")
except Exception as e:
print(f" Error: {e}")
print("\n4. Node Information")
try:
version = client.get_version()
print(f" Version: {version.get('Version', 'unknown')}")
print(f" System: {version.get('System', 'unknown')}")
except Exception as e:
print(f" Error: {e}")
print("\n=== Example Complete ===")
if __name__ == "__main__":
main()